From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 18B49849A6 for ; Tue, 14 Dec 2021 14:51:59 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 05FA425D0E for ; Tue, 14 Dec 2021 14:51:29 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 189F825D03 for ; Tue, 14 Dec 2021 14:51:27 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id DE1F845180 for ; Tue, 14 Dec 2021 14:51:26 +0100 (CET) Date: Tue, 14 Dec 2021 14:51:23 +0100 From: Wolfgang Bumiller To: Dominik Csapak Cc: pbs-devel@lists.proxmox.com Message-ID: <20211214135123.ukmpphmgdk6xhsct@olga.proxmox.com> References: <20211214122412.4077902-1-d.csapak@proxmox.com> <20211214122412.4077902-4-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20211214122412.4077902-4-d.csapak@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.411 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [http.rs, utils.rs, mod.rs, udp.rs, gnu.org, proxmox.com, lib.rs] Subject: Re: [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 14 Dec 2021 13:51:59 -0000 On Tue, Dec 14, 2021 at 01:24:06PM +0100, Dominik Csapak wrote: > influxdb (udp + http(s)) only for now > > general architecture looks as follows: > > "new" returns a MetricsChannel and a Future > the channels can be used to push data in (it flushes automatically if > it would be over the configured size (mtu/max_body_size)) > > and the future must be polled to actually send data to the servers. > > so most often it would look like this: > let (future, channel) = InfluxDbHttp::new(..params..)?; > let handle = tokio::spawn(future); > channel.send_data(...).await?; > handle.await?; > > when all channels go out of scope, all remaining data in the channel > will be read and sent to the server > > Signed-off-by: Dominik Csapak > --- > Cargo.toml | 1 + > proxmox-metrics/Cargo.toml | 20 ++++ > proxmox-metrics/debian/changelog | 5 + > proxmox-metrics/debian/copyright | 16 +++ > proxmox-metrics/debian/debcargo.toml | 7 ++ > proxmox-metrics/src/influxdb/http.rs | 143 ++++++++++++++++++++++++++ > proxmox-metrics/src/influxdb/mod.rs | 7 ++ > proxmox-metrics/src/influxdb/udp.rs | 107 +++++++++++++++++++ > proxmox-metrics/src/influxdb/utils.rs | 51 +++++++++ > proxmox-metrics/src/lib.rs | 92 +++++++++++++++++ > 10 files changed, 449 insertions(+) > create mode 100644 proxmox-metrics/Cargo.toml > create mode 100644 proxmox-metrics/debian/changelog > create mode 100644 proxmox-metrics/debian/copyright > create mode 100644 proxmox-metrics/debian/debcargo.toml > create mode 100644 proxmox-metrics/src/influxdb/http.rs > create mode 100644 proxmox-metrics/src/influxdb/mod.rs > create mode 100644 proxmox-metrics/src/influxdb/udp.rs > create mode 100644 proxmox-metrics/src/influxdb/utils.rs > create mode 100644 proxmox-metrics/src/lib.rs > > diff --git a/Cargo.toml b/Cargo.toml > index 8f85e08..4a458d2 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -6,6 +6,7 @@ members = [ > "proxmox-http", > "proxmox-io", > "proxmox-lang", > + "proxmox-metrics", > "proxmox-router", > "proxmox-schema", > "proxmox-serde", > diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml > new file mode 100644 > index 0000000..9ac50fe > --- /dev/null > +++ b/proxmox-metrics/Cargo.toml > @@ -0,0 +1,20 @@ > +[package] > +name = "proxmox-metrics" > +version = "0.1.0" > +authors = ["Proxmox Support Team "] > +edition = "2018" > +license = "AGPL-3" > +description = "Metrics Server export utilitites" > + > +exclude = [ "debian" ] > + > +[dependencies] > +anyhow = "1.0" > +tokio = { version = "1.0", features = [ "net", "sync" ] } > +futures = "0.3" > +serde = "1.0" > +serde_json = "1.0" > +http = "0.2" > +hyper = "0.14" > +openssl = "0.10" Please sort the above, and separate the line below from the above group with a newline > +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" } > diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog > new file mode 100644 > index 0000000..c02803b > --- /dev/null > +++ b/proxmox-metrics/debian/changelog > @@ -0,0 +1,5 @@ > +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium > + > + * initial package > + > + -- Proxmox Support Team Tue, 14 Dec 2021 08:56:54 +0100 > diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright > new file mode 100644 > index 0000000..5661ef6 > --- /dev/null > +++ b/proxmox-metrics/debian/copyright > @@ -0,0 +1,16 @@ > +Copyright (C) 2021 Proxmox Server Solutions GmbH > + > +This software is written by Proxmox Server Solutions GmbH > + > +This program is free software: you can redistribute it and/or modify > +it under the terms of the GNU Affero General Public License as published by > +the Free Software Foundation, either version 3 of the License, or > +(at your option) any later version. > + > +This program is distributed in the hope that it will be useful, > +but WITHOUT ANY WARRANTY; without even the implied warranty of > +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +GNU Affero General Public License for more details. > + > +You should have received a copy of the GNU Affero General Public License > +along with this program. If not, see . > diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml > new file mode 100644 > index 0000000..b7864cd > --- /dev/null > +++ b/proxmox-metrics/debian/debcargo.toml > @@ -0,0 +1,7 @@ > +overlay = "." > +crate_src_path = ".." > +maintainer = "Proxmox Support Team " > + > +[source] > +vcs_git = "git://git.proxmox.com/git/proxmox.git" > +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" > diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs > new file mode 100644 > index 0000000..8f1157d > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/http.rs > @@ -0,0 +1,143 @@ > +use std::sync::Arc; > + > +use anyhow::{bail, Error}; > +use futures::{future::FutureExt, select}; > +use hyper::Body; > +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; > +use tokio::sync::mpsc; > + > +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions}; > + > +use crate::influxdb::utils; > +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; > + > +pub struct InfluxDbHttp { > + client: SimpleHttp, > + _healthuri: http::Uri, > + writeuri: http::Uri, > + token: Option, > + max_body_size: usize, > + data: String, > + data_channel: mpsc::Receiver>, > + flush_channel: mpsc::Receiver<()>, > +} > + > +impl InfluxDbHttp { > + pub fn new( > + https: bool, > + host: &str, > + port: u16, > + organization: &str, > + bucket: &str, > + token: Option<&str>, > + verify_tls: bool, > + max_body_size: usize, > + ) -> Result<(MetricsServerFuture, MetricsChannel), Error> { > + let (data_tx, data_rx) = mpsc::channel(1024); > + let (flush_tx, flush_rx) = mpsc::channel(1); > + > + let client = if verify_tls { > + SimpleHttp::with_options(SimpleHttpOptions::default()) > + } else { > + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap(); > + ssl_connector.set_verify(SslVerifyMode::NONE); > + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default()) > + }; > + > + let authority = proxmox_http::uri::build_authority(host, port)?; > + > + let writeuri = http::uri::Builder::new() > + .scheme(if https { "https" } else { "http" }) > + .authority(authority.clone()) > + .path_and_query(format!( > + "/api/v2/write?org={}&bucket={}", > + organization, bucket > + )) > + .build()?; > + > + let healthuri = http::uri::Builder::new() > + .scheme(if https { "https" } else { "http" }) > + .authority(authority) > + .path_and_query("/health") > + .build()?; > + > + let this = Self { > + client, > + writeuri, > + _healthuri: healthuri, > + token: token.map(String::from), > + max_body_size, > + data: String::new(), > + data_channel: data_rx, > + flush_channel: flush_rx, > + }; > + > + let future = Box::pin(this.finish()); > + let channel = MetricsChannel { > + data_channel: data_tx, > + flush_channel: flush_tx, > + }; > + Ok((future, channel)) > + } > + > + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { > + let new_data = utils::format_influxdb_line(&data)?; > + > + if self.data.len() + new_data.len() >= self.max_body_size { > + self.flush().await?; > + } > + > + self.data.push_str(&new_data); > + > + if self.data.len() >= self.max_body_size { > + self.flush().await?; > + } > + > + Ok(()) > + } > + > + pub async fn flush(&mut self) -> Result<(), Error> { > + if self.data.is_empty() { > + return Ok(()); > + } > + let mut request = http::Request::builder().method("POST").uri(&self.writeuri); > + > + if let Some(token) = &self.token { > + request = request.header("Authorization", format!("Token {}", token)); > + } > + > + let request = request.body(Body::from(self.data.split_off(0)))?; > + > + let res = self.client.request(request).await?; > + > + let status = res.status(); > + if !status.is_success() { > + bail!("got bad status: {}", status); > + } > + Ok(()) > + } > + > + async fn finish(mut self) -> Result<(), Error> { > + loop { > + select! { I wonder, don't you want to receive data & flushes in some kind of order? Wouldn't a single channel over an `enum MetricsValue { Flush, Data(MetricsData) }` make more sense? > + res = self.flush_channel.recv().fuse() => match res { > + Some(_) => self.flush().await?, > + None => break, // all senders gone > + }, > + data = self.data_channel.recv().fuse() => match data { > + Some(data) => self.add_data(data).await?, > + None => break, // all senders gone > + }, > + } > + } > + > + // consume remaining data in channel > + while let Some(data) = self.data_channel.recv().await { > + self.add_data(data).await?; > + } > + > + self.flush().await?; > + > + Ok(()) > + } > +} > diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs > new file mode 100644 > index 0000000..26a715c > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/mod.rs > @@ -0,0 +1,7 @@ > +mod http; > +pub use self::http::*; > + > +mod udp; > +pub use udp::*; > + > +pub mod utils; > diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs > new file mode 100644 > index 0000000..de2b0d5 > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/udp.rs > @@ -0,0 +1,107 @@ > +use std::sync::Arc; > + > +use anyhow::Error; > +use futures::{future::FutureExt, select}; > +use tokio::net::UdpSocket; > +use tokio::sync::mpsc; > + > +use crate::influxdb::utils; > +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; > + > +pub struct InfluxDbUdp { > + address: String, > + conn: Option, > + mtu: u16, > + data: String, > + data_channel: mpsc::Receiver>, > + flush_channel: mpsc::Receiver<()>, > +} > + > +impl InfluxDbUdp { > + pub fn new(host: &str, port: u16, mtu: Option) -> (MetricsServerFuture, MetricsChannel) { > + let (data_tx, data_rx) = mpsc::channel(1024); > + let (flush_tx, flush_rx) = mpsc::channel(1); > + > + let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" { > + format!("[{}]:{}", host, port) Here you handle IPv6 but... > + } else { > + format!("{}:{}", host, port) > + }; > + > + let this = Self { > + address, > + conn: None, > + mtu: mtu.unwrap_or(1500), > + data: String::new(), > + data_channel: data_rx, > + flush_channel: flush_rx, > + }; > + > + let future = Box::pin(this.finish()); > + > + let channel = MetricsChannel { > + data_channel: data_tx, > + flush_channel: flush_tx, > + }; > + > + (future, channel) > + } > + > + async fn connect(&mut self) -> Result { > + let conn = UdpSocket::bind("0.0.0.0:0").await?; ...here you're specifically binding to an IPv4 which will cause rust to issue a `socket(AF_INET, ...)` syscall rather than `socket(AF_INET6, ...)` for IPv6. > + let addr = self.address.clone(); > + conn.connect(addr).await?; > + Ok(conn) > + } > + > + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { > + let new_data = utils::format_influxdb_line(&data)?; > + > + if self.data.len() + new_data.len() >= (self.mtu as usize) { > + self.flush().await?; > + } > + > + self.data.push_str(&new_data); Is it possible for `new_data.len()` to be larger than the mtu? if so, should this warn or something? Otherwise the next flush below might become a problem? > + > + if self.data.len() >= (self.mtu as usize) { > + self.flush().await?; > + } > + > + Ok(()) > + } > + > + async fn flush(&mut self) -> Result<(), Error> { > + let conn = match self.conn.take() { > + Some(conn) => conn, > + None => self.connect().await?, > + }; > + > + conn.send(self.data.split_off(0).as_bytes()).await?; > + self.conn = Some(conn); > + Ok(()) > + } > + > + async fn finish(mut self) -> Result<(), Error> { > + loop { > + select! { > + res = self.flush_channel.recv().fuse() => match res { > + Some(_) => self.flush().await?, > + None => break, // all senders gone > + }, > + data = self.data_channel.recv().fuse() => match data { > + Some(data) => self.add_data(data).await?, > + None => break, // all senders gone > + }, > + } > + } > + > + // consume remaining data in channel > + while let Some(data) = self.data_channel.recv().await { > + self.add_data(data).await?; > + } > + > + self.flush().await?; > + > + Ok(()) > + } > +} > diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs > new file mode 100644 > index 0000000..bf391f9 > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/utils.rs > @@ -0,0 +1,51 @@ > +use anyhow::{bail, Error}; > + > +use crate::MetricsData; > + > +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result { > + if !data.values.is_object() { > + bail!("invalid data"); > + } > + > + let mut line = escape_measurement(&data.measurement); > + line.push(','); > + > + let tags = data.tags.iter().map(|(key, value)| { > + format!("{}={}", escape_key(&key), escape_key(&value)) > + }); > + line.push_str(&tags.collect::>().join(",")); I'm not too fond of the temporary `Vec` here and below, maybe use `line.extend()` with the ',' as part of the format string (",{}={}") or skip even the temporary format and just for (key, value) in &data.tags { line.push(',') line.push_str(escape_key(&key)) line.push('=') line.push_str(value) } it's not really longer... alternatively, more readable and without the temporary `String` would be `write!(line, ",{}={}", ...)?` etc. > + > + line.push(' '); > + > + let values = data.values.as_object().unwrap().iter().map(|(key, value)| { > + let value = if value.is_string() { > + escape_value(&value.to_string()) ^ extra space? :P > + } else { > + value.to_string() > + }; > + format!("{}={}", escape_key(&key), value) > + }); > + > + line.push_str(&values.collect::>().join(",")); > + > + // nanosecond precision > + line.push_str(&format!(" {}\n", data.ctime*1_000_000_000)); > + Ok(line) > +} > + > +fn escape_key(key: &str) -> String { > + let key = key.replace(',', "\\,"); > + let key = key.replace('=', "\\="); > + let key = key.replace(' ', "\\ "); > + key > +} > + > +fn escape_measurement(measurement: &str) -> String { > + let measurement = measurement.replace(',', "\\,"); > + let measurement = measurement.replace(' ', "\\ "); > + measurement > +} > + > +fn escape_value(value: &str) -> String { > + format!("\"{}\"",value.replace('"', "\\\"")) > +} > diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs > new file mode 100644 > index 0000000..0a76faa > --- /dev/null > +++ b/proxmox-metrics/src/lib.rs > @@ -0,0 +1,92 @@ > +use std::collections::HashMap; > +use std::pin::Pin; > +use std::sync::Arc; > + > +use anyhow::{bail, format_err, Error}; > +use serde::Serialize; > +use serde_json::Value; > +use tokio::sync::mpsc; > + > +pub mod influxdb; > + > +#[derive(Clone)] > +/// Structured data for the metric server > +pub struct MetricsData { > + /// The category of measurements > + pub measurement: String, > + /// A list of to attach to the measurements > + pub tags: HashMap, > + /// The actual values to send. Only plain (not-nested) objects are supported at the moment. > + pub values: Value, > + /// The time of the measurement > + pub ctime: i64, > +} > + > +impl MetricsData { > + /// Convenient helper to create from references > + pub fn new(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result { > + let mut new_tags = HashMap::new(); > + for (key, value) in tags { > + new_tags.insert(key.to_string(), value.to_string()); > + } > + > + Ok(Self{ > + measurement: measurement.to_string(), > + tags: new_tags, > + values: serde_json::to_value(values)?, > + ctime, > + }) > + } > +} > + > +pub type MetricsServerFuture = > + Pin> + Send + 'static>>; > + > +#[derive(Clone)] > +/// A channel to send data to the metric server > +pub struct MetricsChannel { > + pub(crate) data_channel: mpsc::Sender>, > + pub(crate) flush_channel: mpsc::Sender<()>, > +} > + > +impl MetricsChannel { > + /// Queues the given data for the metric server. If the queue is full, > + /// flush and try again. > + pub async fn send_data(&self, data: Arc) -> Result<(), Error> { > + if let Err(err) = self.data_channel.try_send(data) { > + match err { > + mpsc::error::TrySendError::Full(data) => { > + self.flush_channel.send(()).await?; > + self.data_channel > + .send(data) > + .await > + .map_err(|_| format_err!("error sending data"))?; > + } > + mpsc::error::TrySendError::Closed(_) => { > + bail!("channel closed"); > + } > + } > + } > + Ok(()) > + } > + > + /// Flush data to the metric server > + pub async fn flush(&self) -> Result<(), Error> { > + self.flush_channel.send(()).await?; > + Ok(()) > + } > +} > + > +pub async fn send_data_to_channels(values: &[Arc], channels: &[MetricsChannel]) -> Vec> { > + let mut futures = Vec::with_capacity(channels.len()); > + for channel in channels { > + futures.push(async move { > + for data in values.into_iter() { `.into_iter()` shouldn't be necessary, that's how `for` loops are defined after all. > + channel.send_data(data.clone()).await? > + } > + Ok::<(), Error>(()) > + }); > + } > + > + futures::future::join_all(futures).await > +} > -- > 2.30.2