From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id BD0826163B for ; Mon, 17 Jan 2022 11:48:36 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B224618C70 for ; Mon, 17 Jan 2022 11:48:36 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 8089A18BB2 for ; Mon, 17 Jan 2022 11:48:29 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 5826844968 for ; Mon, 17 Jan 2022 11:48:29 +0100 (CET) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Mon, 17 Jan 2022 11:48:19 +0100 Message-Id: <20220117104825.2409598-5-d.csapak@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20220117104825.2409598-1-d.csapak@proxmox.com> References: <20220117104825.2409598-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.111 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox v4 4/4] proxmox-metrics: implement metrics server client code X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 17 Jan 2022 10:48:36 -0000 influxdb (udp + http(s)) only for now general architecture looks as follows: the helper functions influxdb_http/udp start a tokio task and return a Metrics struct, that can be used to send data and wait for the tokio task. if the struct is dropped, the task is canceled. so it would look like this: let metrics = influxdb_http(..params..)?; metrics.send_data(...).await?; metrics.send_data(...).await?; metrics.join?; on join, the sending part of the channel will be dropped and thus flushing the remaining data to the server Signed-off-by: Dominik Csapak --- Cargo.toml | 1 + proxmox-metrics/Cargo.toml | 21 ++++ proxmox-metrics/debian/changelog | 5 + proxmox-metrics/debian/copyright | 16 ++++ proxmox-metrics/debian/debcargo.toml | 7 ++ proxmox-metrics/src/influxdb/http.rs | 132 ++++++++++++++++++++++++++ proxmox-metrics/src/influxdb/mod.rs | 7 ++ proxmox-metrics/src/influxdb/udp.rs | 80 ++++++++++++++++ proxmox-metrics/src/influxdb/utils.rs | 50 ++++++++++ proxmox-metrics/src/lib.rs | 117 +++++++++++++++++++++++ 10 files changed, 436 insertions(+) create mode 100644 proxmox-metrics/Cargo.toml create mode 100644 proxmox-metrics/debian/changelog create mode 100644 proxmox-metrics/debian/copyright create mode 100644 proxmox-metrics/debian/debcargo.toml create mode 100644 proxmox-metrics/src/influxdb/http.rs create mode 100644 proxmox-metrics/src/influxdb/mod.rs create mode 100644 proxmox-metrics/src/influxdb/udp.rs create mode 100644 proxmox-metrics/src/influxdb/utils.rs create mode 100644 proxmox-metrics/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8f85e08..4a458d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "proxmox-http", "proxmox-io", "proxmox-lang", + "proxmox-metrics", "proxmox-router", "proxmox-schema", "proxmox-serde", diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml new file mode 100644 index 0000000..4f0b8e3 --- /dev/null +++ b/proxmox-metrics/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "proxmox-metrics" +version = "0.1.0" +authors = ["Proxmox Support Team "] +edition = "2018" +license = "AGPL-3" +description = "Metrics Server export utilitites" + +exclude = [ "debian" ] + +[dependencies] +anyhow = "1.0" +tokio = { version = "1.0", features = [ "net", "sync" ] } +futures = "0.3" +serde = "1.0" +serde_json = "1.0" +http = "0.2" +hyper = "0.14" +openssl = "0.10" +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" } +proxmox-async = { path = "../proxmox-async", features = [], version = "0.3" } diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog new file mode 100644 index 0000000..c02803b --- /dev/null +++ b/proxmox-metrics/debian/changelog @@ -0,0 +1,5 @@ +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium + + * initial package + + -- Proxmox Support Team Tue, 14 Dec 2021 08:56:54 +0100 diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright new file mode 100644 index 0000000..5661ef6 --- /dev/null +++ b/proxmox-metrics/debian/copyright @@ -0,0 +1,16 @@ +Copyright (C) 2021 Proxmox Server Solutions GmbH + +This software is written by Proxmox Server Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml new file mode 100644 index 0000000..b7864cd --- /dev/null +++ b/proxmox-metrics/debian/debcargo.toml @@ -0,0 +1,7 @@ +overlay = "." +crate_src_path = ".." +maintainer = "Proxmox Support Team " + +[source] +vcs_git = "git://git.proxmox.com/git/proxmox.git" +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs new file mode 100644 index 0000000..8aecf70 --- /dev/null +++ b/proxmox-metrics/src/influxdb/http.rs @@ -0,0 +1,132 @@ +use std::sync::Arc; + +use anyhow::{bail, Error}; +use hyper::Body; +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; +use tokio::sync::mpsc; + +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions}; + +use crate::influxdb::utils; +use crate::{Metrics, MetricsData}; + +struct InfluxDbHttp { + client: SimpleHttp, + _healthuri: http::Uri, + writeuri: http::Uri, + token: Option, + max_body_size: usize, + data: String, + channel: mpsc::Receiver>, +} + +/// Returns a [Metrics] handle that connects and sends data to the +/// given influxdb server at the given https url +pub fn influxdb_http( + uri: &str, + organization: &str, + bucket: &str, + token: Option<&str>, + verify_tls: bool, + max_body_size: usize, +) -> Result { + let (tx, rx) = mpsc::channel(1024); + + let client = if verify_tls { + SimpleHttp::with_options(SimpleHttpOptions::default()) + } else { + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap(); + ssl_connector.set_verify(SslVerifyMode::NONE); + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default()) + }; + + let uri: http::uri::Uri = uri.parse()?; + let uri_parts = uri.into_parts(); + + let base_path = if let Some(ref p) = uri_parts.path_and_query { + p.path().trim_end_matches("/") + } else { + "" + }; + + let writeuri = http::uri::Builder::new() + .scheme(uri_parts.scheme.clone().unwrap()) + .authority(uri_parts.authority.clone().unwrap()) + .path_and_query(format!( + "{}/api/v2/write?org={}&bucket={}", + base_path, organization, bucket + )) + .build()?; + + let healthuri = http::uri::Builder::new() + .scheme(uri_parts.scheme.unwrap()) + .authority(uri_parts.authority.unwrap()) + .path_and_query(format!("{}/health", base_path)) + .build()?; + + let this = InfluxDbHttp { + client, + writeuri, + _healthuri: healthuri, + token: token.map(String::from), + max_body_size, + data: String::new(), + channel: rx, + }; + + let join_handle = Some(tokio::spawn(async { this.finish().await })); + + Ok(Metrics { + join_handle, + channel: Some(tx), + }) +} + +impl InfluxDbHttp { + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { + let new_data = utils::format_influxdb_line(&data)?; + + if self.data.len() + new_data.len() >= self.max_body_size { + self.flush().await?; + } + + self.data.push_str(&new_data); + + if self.data.len() >= self.max_body_size { + self.flush().await?; + } + + Ok(()) + } + + async fn flush(&mut self) -> Result<(), Error> { + if self.data.is_empty() { + return Ok(()); + } + let mut request = http::Request::builder().method("POST").uri(&self.writeuri); + + if let Some(token) = &self.token { + request = request.header("Authorization", format!("Token {}", token)); + } + + let request = request.body(Body::from(self.data.split_off(0)))?; + + let res = self.client.request(request).await?; + + let status = res.status(); + if !status.is_success() { + bail!("got bad status: {}", status); + } + Ok(()) + } + + async fn finish(mut self) -> Result<(), Error> { + while let Some(data) = self.channel.recv().await { + self.add_data(data).await?; + } + + self.flush().await?; + + Ok(()) + } +} diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs new file mode 100644 index 0000000..26a715c --- /dev/null +++ b/proxmox-metrics/src/influxdb/mod.rs @@ -0,0 +1,7 @@ +mod http; +pub use self::http::*; + +mod udp; +pub use udp::*; + +pub mod utils; diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs new file mode 100644 index 0000000..8f7669b --- /dev/null +++ b/proxmox-metrics/src/influxdb/udp.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use anyhow::Error; +use tokio::sync::mpsc; + +use proxmox_async::io::connect_to_udp; + +use crate::influxdb::utils; +use crate::{Metrics, MetricsData}; + +struct InfluxDbUdp { + address: String, + conn: Option, + mtu: u16, + data: String, + channel: mpsc::Receiver>, +} + +/// Returns a [Metrics] handle that connects and sends data to the +/// given influxdb server at the given udp address/port +/// +/// `address` must be in the format of 'ip_or_hostname:port' +pub fn influxdb_udp(address: &str, mtu: Option) -> Metrics { + let (tx, rx) = mpsc::channel(1024); + + let this = InfluxDbUdp { + address: address.to_string(), + conn: None, + // empty ipv6 udp package needs 48 bytes, subtract 50 for safety + mtu: mtu.unwrap_or(1500) - 50, + data: String::new(), + channel: rx, + }; + + let join_handle = Some(tokio::spawn(async { this.finish().await })); + + Metrics { + join_handle, + channel: Some(tx), + } +} + +impl InfluxDbUdp { + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { + let new_data = utils::format_influxdb_line(&data)?; + + if self.data.len() + new_data.len() >= (self.mtu as usize) { + self.flush().await?; + } + + self.data.push_str(&new_data); + + if self.data.len() >= (self.mtu as usize) { + self.flush().await?; + } + + Ok(()) + } + + async fn flush(&mut self) -> Result<(), Error> { + let conn = match self.conn.take() { + Some(conn) => conn, + None => connect_to_udp(&self.address).await?, + }; + + conn.send(self.data.split_off(0).as_bytes()).await?; + self.conn = Some(conn); + Ok(()) + } + + async fn finish(mut self) -> Result<(), Error> { + while let Some(data) = self.channel.recv().await { + self.add_data(data).await?; + } + + self.flush().await?; + + Ok(()) + } +} diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs new file mode 100644 index 0000000..3507e61 --- /dev/null +++ b/proxmox-metrics/src/influxdb/utils.rs @@ -0,0 +1,50 @@ +use std::fmt::Write; + +use anyhow::{bail, Error}; +use serde_json::Value; + +use crate::MetricsData; + +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result { + if !data.values.is_object() { + bail!("invalid data"); + } + + let mut line = escape_measurement(&data.measurement); + + for (key, value) in &data.tags { + write!(line, ",{}={}", escape_key(key), escape_key(value))?; + } + + line.push(' '); + + let mut first = true; + for (key, value) in data.values.as_object().unwrap().iter() { + match value { + Value::Object(_) => bail!("objects not supported"), + Value::Array(_) => bail!("arrays not supported"), + _ => {} + } + if !first { + line.push(','); + } + first = false; + write!(line, "{}={}", escape_key(key), value.to_string())?; + } + + // nanosecond precision + writeln!(line, " {}", data.ctime * 1_000_000_000)?; + + Ok(line) +} + +fn escape_key(key: &str) -> String { + let key = key.replace(',', "\\,"); + let key = key.replace('=', "\\="); + key.replace(' ', "\\ ") +} + +fn escape_measurement(measurement: &str) -> String { + let measurement = measurement.replace(',', "\\,"); + measurement.replace(' ', "\\ ") +} diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs new file mode 100644 index 0000000..5325d25 --- /dev/null +++ b/proxmox-metrics/src/lib.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::{bail, format_err, Error}; +use serde::Serialize; +use serde_json::Value; +use tokio::sync::mpsc; + +mod influxdb; +#[doc(inline)] +pub use influxdb::{influxdb_http, influxdb_udp}; + +#[derive(Clone)] +/// Structured data for the metric server +pub struct MetricsData { + /// The category of measurements + pub measurement: String, + /// A list of to attach to the measurements + pub tags: HashMap, + /// The actual values to send. Only plain (not-nested) objects are supported at the moment. + pub values: Value, + /// The time of the measurement + pub ctime: i64, +} + +impl MetricsData { + /// Convenient helper to create from references + pub fn new( + measurement: &str, + tags: &[(&str, &str)], + ctime: i64, + values: V, + ) -> Result { + let mut new_tags = HashMap::new(); + for (key, value) in tags { + new_tags.insert(key.to_string(), value.to_string()); + } + + Ok(Self { + measurement: measurement.to_string(), + tags: new_tags, + values: serde_json::to_value(values)?, + ctime, + }) + } +} + +/// Helper to send a list of [MetricsData] to a list of [Metrics] +pub async fn send_data_to_channels( + values: &[Arc], + connections: &[Metrics], +) -> Vec> { + let mut futures = Vec::with_capacity(connections.len()); + for connection in connections { + futures.push(async move { + for data in values { + connection.send_data(Arc::clone(data)).await? + } + Ok::<(), Error>(()) + }); + } + + futures::future::join_all(futures).await +} + +/// Represents connection to the metric server which can be used to send data +/// +/// You can send [MetricsData] by using [`Self::send_data()`], and to flush and +/// finish the connection use [`Self::join`]. +/// +/// If dropped, it will abort the connection and not flush out buffered data. +pub struct Metrics { + join_handle: Option>>, + channel: Option>>, +} + +impl Drop for Metrics { + fn drop(&mut self) { + if let Some(join_handle) = self.join_handle.take() { + join_handle.abort(); + } + } +} + +impl Metrics { + /// Closes the queue and waits for the connection to send all remaining data + pub async fn join(mut self) -> Result<(), Error> { + if let Some(channel) = self.channel.take() { + drop(channel); + } + if let Some(join_handle) = self.join_handle.take() { + join_handle.await? + } else { + bail!("internal error: no join_handle") + } + } + + /// Queues the given data to the metric server + pub async fn send_data(&self, data: Arc) -> Result<(), Error> { + // return ok if we got no data to send + if let Value::Object(map) = &data.values { + if map.is_empty() { + return Ok(()); + } + } + + if let Some(channel) = &self.channel { + channel + .send(data) + .await + .map_err(|_| format_err!("receiver side closed"))?; + } else { + bail!("channel was already closed"); + } + Ok(()) + } +} -- 2.30.2