From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 9DE9F85132 for ; Fri, 17 Dec 2021 09:10:09 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 9A16228897 for ; Fri, 17 Dec 2021 09:10:09 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 0126828709 for ; Fri, 17 Dec 2021 09:10:03 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id CCDB345238 for ; Fri, 17 Dec 2021 09:10:02 +0100 (CET) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Fri, 17 Dec 2021 09:09:54 +0100 Message-Id: <20211217081000.1061796-4-d.csapak@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20211217081000.1061796-1-d.csapak@proxmox.com> References: <20211217081000.1061796-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.170 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs, lib.rs, self.channel, gnu.org, udp.rs, http.rs, utils.rs, proxmox.com] Subject: [pbs-devel] [PATCH proxmox v3 3/3] proxmox-metrics: implement metrics server client code X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 17 Dec 2021 08:10:09 -0000 influxdb (udp + http(s)) only for now general architecture looks as follows: "new" returns a MetricsChannel and a Future the channels can be used to push data in (it flushes automatically if it would be over the configured size (mtu/max_body_size)) and the future must be polled to actually send data to the servers. so most often it would look like this: let (future, channel) = InfluxDbHttp::new(..params..)?; let handle = tokio::spawn(future); channel.send_data(...).await?; handle.await?; when all channels go out of scope, all remaining data in the channel will be read and sent to the server Signed-off-by: Dominik Csapak --- Cargo.toml | 1 + proxmox-metrics/Cargo.toml | 20 +++++ proxmox-metrics/debian/changelog | 5 ++ proxmox-metrics/debian/copyright | 16 ++++ proxmox-metrics/debian/debcargo.toml | 7 ++ proxmox-metrics/src/influxdb/http.rs | 122 ++++++++++++++++++++++++++ proxmox-metrics/src/influxdb/mod.rs | 7 ++ proxmox-metrics/src/influxdb/udp.rs | 94 ++++++++++++++++++++ proxmox-metrics/src/influxdb/utils.rs | 50 +++++++++++ proxmox-metrics/src/lib.rs | 89 +++++++++++++++++++ 10 files changed, 411 insertions(+) create mode 100644 proxmox-metrics/Cargo.toml create mode 100644 proxmox-metrics/debian/changelog create mode 100644 proxmox-metrics/debian/copyright create mode 100644 proxmox-metrics/debian/debcargo.toml create mode 100644 proxmox-metrics/src/influxdb/http.rs create mode 100644 proxmox-metrics/src/influxdb/mod.rs create mode 100644 proxmox-metrics/src/influxdb/udp.rs create mode 100644 proxmox-metrics/src/influxdb/utils.rs create mode 100644 proxmox-metrics/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8f85e08..4a458d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "proxmox-http", "proxmox-io", "proxmox-lang", + "proxmox-metrics", "proxmox-router", "proxmox-schema", "proxmox-serde", diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml new file mode 100644 index 0000000..9ac50fe --- /dev/null +++ b/proxmox-metrics/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "proxmox-metrics" +version = "0.1.0" +authors = ["Proxmox Support Team "] +edition = "2018" +license = "AGPL-3" +description = "Metrics Server export utilitites" + +exclude = [ "debian" ] + +[dependencies] +anyhow = "1.0" +tokio = { version = "1.0", features = [ "net", "sync" ] } +futures = "0.3" +serde = "1.0" +serde_json = "1.0" +http = "0.2" +hyper = "0.14" +openssl = "0.10" +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" } diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog new file mode 100644 index 0000000..c02803b --- /dev/null +++ b/proxmox-metrics/debian/changelog @@ -0,0 +1,5 @@ +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium + + * initial package + + -- Proxmox Support Team Tue, 14 Dec 2021 08:56:54 +0100 diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright new file mode 100644 index 0000000..5661ef6 --- /dev/null +++ b/proxmox-metrics/debian/copyright @@ -0,0 +1,16 @@ +Copyright (C) 2021 Proxmox Server Solutions GmbH + +This software is written by Proxmox Server Solutions GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml new file mode 100644 index 0000000..b7864cd --- /dev/null +++ b/proxmox-metrics/debian/debcargo.toml @@ -0,0 +1,7 @@ +overlay = "." +crate_src_path = ".." +maintainer = "Proxmox Support Team " + +[source] +vcs_git = "git://git.proxmox.com/git/proxmox.git" +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs new file mode 100644 index 0000000..df6c6a5 --- /dev/null +++ b/proxmox-metrics/src/influxdb/http.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; + +use anyhow::{bail, Error}; +use hyper::Body; +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; +use tokio::sync::mpsc; + +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions}; + +use crate::influxdb::utils; +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; + +pub struct InfluxDbHttp { + client: SimpleHttp, + _healthuri: http::Uri, + writeuri: http::Uri, + token: Option, + max_body_size: usize, + data: String, + channel: mpsc::Receiver>, +} + +impl InfluxDbHttp { + pub fn new( + https: bool, + host: &str, + port: u16, + organization: &str, + bucket: &str, + token: Option<&str>, + verify_tls: bool, + max_body_size: usize, + ) -> Result<(MetricsServerFuture, MetricsChannel), Error> { + let (tx, rx) = mpsc::channel(1024); + + let client = if verify_tls { + SimpleHttp::with_options(SimpleHttpOptions::default()) + } else { + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap(); + ssl_connector.set_verify(SslVerifyMode::NONE); + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default()) + }; + + let authority = proxmox_http::uri::build_authority(host, port)?; + + let writeuri = http::uri::Builder::new() + .scheme(if https { "https" } else { "http" }) + .authority(authority.clone()) + .path_and_query(format!( + "/api/v2/write?org={}&bucket={}", + organization, bucket + )) + .build()?; + + let healthuri = http::uri::Builder::new() + .scheme(if https { "https" } else { "http" }) + .authority(authority) + .path_and_query("/health") + .build()?; + + let this = Self { + client, + writeuri, + _healthuri: healthuri, + token: token.map(String::from), + max_body_size, + data: String::new(), + channel: rx, + }; + + let future = Box::pin(this.finish()); + let channel = MetricsChannel { channel: tx }; + Ok((future, channel)) + } + + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { + let new_data = utils::format_influxdb_line(&data)?; + + if self.data.len() + new_data.len() >= self.max_body_size { + self.flush().await?; + } + + self.data.push_str(&new_data); + + if self.data.len() >= self.max_body_size { + self.flush().await?; + } + + Ok(()) + } + + pub async fn flush(&mut self) -> Result<(), Error> { + if self.data.is_empty() { + return Ok(()); + } + let mut request = http::Request::builder().method("POST").uri(&self.writeuri); + + if let Some(token) = &self.token { + request = request.header("Authorization", format!("Token {}", token)); + } + + let request = request.body(Body::from(self.data.split_off(0)))?; + + let res = self.client.request(request).await?; + + let status = res.status(); + if !status.is_success() { + bail!("got bad status: {}", status); + } + Ok(()) + } + + async fn finish(mut self) -> Result<(), Error> { + while let Some(data) = self.channel.recv().await { + self.add_data(data).await?; + } + + self.flush().await?; + + Ok(()) + } +} diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs new file mode 100644 index 0000000..26a715c --- /dev/null +++ b/proxmox-metrics/src/influxdb/mod.rs @@ -0,0 +1,7 @@ +mod http; +pub use self::http::*; + +mod udp; +pub use udp::*; + +pub mod utils; diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs new file mode 100644 index 0000000..c4187a6 --- /dev/null +++ b/proxmox-metrics/src/influxdb/udp.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use anyhow::Error; +use tokio::net::UdpSocket; +use tokio::sync::mpsc; + +use crate::influxdb::utils; +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; + +pub struct InfluxDbUdp { + address: String, + conn: Option, + ipv6: bool, + mtu: u16, + data: String, + channel: mpsc::Receiver>, +} + +impl InfluxDbUdp { + pub fn new(host: &str, port: u16, mtu: Option) -> (MetricsServerFuture, MetricsChannel) { + let (tx, rx) = mpsc::channel(1024); + let ipv6 = host.contains(':'); + + let address = if ipv6 && host.len() > 3 && &host[0..1] != "[" { + format!("[{}]:{}", host, port) + } else { + format!("{}:{}", host, port) + }; + + let this = Self { + address, + ipv6, + conn: None, + // empty ipv6 udp package needs 48 bytes, subtract 50 for safety + mtu: mtu.unwrap_or(1500) - 50, + data: String::new(), + channel: rx, + }; + + let future = Box::pin(this.finish()); + + let channel = MetricsChannel { channel: tx }; + + (future, channel) + } + + async fn connect(&mut self) -> Result { + let conn = if self.ipv6 { + UdpSocket::bind("[::]:0").await? + } else { + UdpSocket::bind("0.0.0.0:0").await? + }; + let addr = self.address.clone(); + conn.connect(addr).await?; + Ok(conn) + } + + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { + let new_data = utils::format_influxdb_line(&data)?; + + if self.data.len() + new_data.len() >= (self.mtu as usize) { + self.flush().await?; + } + + self.data.push_str(&new_data); + + if self.data.len() >= (self.mtu as usize) { + self.flush().await?; + } + + Ok(()) + } + + async fn flush(&mut self) -> Result<(), Error> { + let conn = match self.conn.take() { + Some(conn) => conn, + None => self.connect().await?, + }; + + conn.send(self.data.split_off(0).as_bytes()).await?; + self.conn = Some(conn); + Ok(()) + } + + async fn finish(mut self) -> Result<(), Error> { + while let Some(data) = self.channel.recv().await { + self.add_data(data).await?; + } + + self.flush().await?; + + Ok(()) + } +} diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs new file mode 100644 index 0000000..3507e61 --- /dev/null +++ b/proxmox-metrics/src/influxdb/utils.rs @@ -0,0 +1,50 @@ +use std::fmt::Write; + +use anyhow::{bail, Error}; +use serde_json::Value; + +use crate::MetricsData; + +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result { + if !data.values.is_object() { + bail!("invalid data"); + } + + let mut line = escape_measurement(&data.measurement); + + for (key, value) in &data.tags { + write!(line, ",{}={}", escape_key(key), escape_key(value))?; + } + + line.push(' '); + + let mut first = true; + for (key, value) in data.values.as_object().unwrap().iter() { + match value { + Value::Object(_) => bail!("objects not supported"), + Value::Array(_) => bail!("arrays not supported"), + _ => {} + } + if !first { + line.push(','); + } + first = false; + write!(line, "{}={}", escape_key(key), value.to_string())?; + } + + // nanosecond precision + writeln!(line, " {}", data.ctime * 1_000_000_000)?; + + Ok(line) +} + +fn escape_key(key: &str) -> String { + let key = key.replace(',', "\\,"); + let key = key.replace('=', "\\="); + key.replace(' ', "\\ ") +} + +fn escape_measurement(measurement: &str) -> String { + let measurement = measurement.replace(',', "\\,"); + measurement.replace(' ', "\\ ") +} diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs new file mode 100644 index 0000000..ba018fb --- /dev/null +++ b/proxmox-metrics/src/lib.rs @@ -0,0 +1,89 @@ +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::{format_err, Error}; +use serde::Serialize; +use serde_json::Value; +use tokio::sync::mpsc; + +pub mod influxdb; + +#[derive(Clone)] +/// Structured data for the metric server +pub struct MetricsData { + /// The category of measurements + pub measurement: String, + /// A list of to attach to the measurements + pub tags: HashMap, + /// The actual values to send. Only plain (not-nested) objects are supported at the moment. + pub values: Value, + /// The time of the measurement + pub ctime: i64, +} + +impl MetricsData { + /// Convenient helper to create from references + pub fn new( + measurement: &str, + tags: &[(&str, &str)], + ctime: i64, + values: V, + ) -> Result { + let mut new_tags = HashMap::new(); + for (key, value) in tags { + new_tags.insert(key.to_string(), value.to_string()); + } + + Ok(Self { + measurement: measurement.to_string(), + tags: new_tags, + values: serde_json::to_value(values)?, + ctime, + }) + } +} + +pub type MetricsServerFuture = + Pin> + Send + 'static>>; + +#[derive(Clone)] +/// A channel to send data to the metric server +pub struct MetricsChannel { + pub(crate) channel: mpsc::Sender>, +} + +impl MetricsChannel { + /// Queues the given data to the metric server + pub async fn send_data(&self, data: Arc) -> Result<(), Error> { + // return ok if we got no data to send + if let Value::Object(map) = &data.values { + if map.is_empty() { + return Ok(()); + } + } + self.channel + .send(data) + .await + .map_err(|_| format_err!("receiver side closed"))?; + Ok(()) + } +} + +/// Helper to send a list of MetricsData to a list of MetricsChannels +pub async fn send_data_to_channels( + values: &[Arc], + channels: &[MetricsChannel], +) -> Vec> { + let mut futures = Vec::with_capacity(channels.len()); + for channel in channels { + futures.push(async move { + for data in values { + channel.send_data(Arc::clone(data)).await? + } + Ok::<(), Error>(()) + }); + } + + futures::future::join_all(futures).await +} -- 2.30.2