From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 017AD605A1 for ; Wed, 12 Jan 2022 15:37:04 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id EA7453C26 for ; Wed, 12 Jan 2022 15:37:03 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id D52703C19 for ; Wed, 12 Jan 2022 15:37:01 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id A958846C85 for ; Wed, 12 Jan 2022 15:37:01 +0100 (CET) Date: Wed, 12 Jan 2022 15:36:58 +0100 From: Wolfgang Bumiller To: Dominik Csapak Cc: pbs-devel@lists.proxmox.com Message-ID: <20220112143658.2t5vimqdn5ma7d45@olga.proxmox.com> References: <20211217081000.1061796-1-d.csapak@proxmox.com> <20211217081000.1061796-4-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20211217081000.1061796-4-d.csapak@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.401 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox v3 3/3] proxmox-metrics: implement metrics server client code X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 12 Jan 2022 14:37:04 -0000 On Fri, Dec 17, 2021 at 09:09:54AM +0100, Dominik Csapak wrote: > influxdb (udp + http(s)) only for now > > general architecture looks as follows: > > "new" returns a MetricsChannel and a Future > the channels can be used to push data in (it flushes automatically if > it would be over the configured size (mtu/max_body_size)) > > and the future must be polled to actually send data to the servers. > > so most often it would look like this: > let (future, channel) = InfluxDbHttp::new(..params..)?; > let handle = tokio::spawn(future); > channel.send_data(...).await?; > handle.await?; Sounds okay, but we already depend on tokio here, so maybe we should consider always spawning and putting the spawn-handle and channel together into a single helper. The reason I'd like to do this is because I feel your above example is not ideal. I'd prefer to enforce control over *aborting* the the spawned task by wrapping it in `futures::future::abortable()` roughly like so: struct Metrics { join_handle: JoinHandle<_>, abort_handle: AbortHandle, channel: Sender<_>, } // convenience impl Deref for Metrics { type Target = Sender<_>; fn deref(&self) -> &Sender { &self.channel } } impl Drop for Metrics { fn drop(&mut self) { self.abort_handle.abort(); } } impl Metrics { pub async fn join(self) -> _ { self.join_handle.join().await } } > > when all channels go out of scope, all remaining data in the channel > will be read and sent to the server > > Signed-off-by: Dominik Csapak > --- > Cargo.toml | 1 + > proxmox-metrics/Cargo.toml | 20 +++++ > proxmox-metrics/debian/changelog | 5 ++ > proxmox-metrics/debian/copyright | 16 ++++ > proxmox-metrics/debian/debcargo.toml | 7 ++ > proxmox-metrics/src/influxdb/http.rs | 122 ++++++++++++++++++++++++++ > proxmox-metrics/src/influxdb/mod.rs | 7 ++ > proxmox-metrics/src/influxdb/udp.rs | 94 ++++++++++++++++++++ > proxmox-metrics/src/influxdb/utils.rs | 50 +++++++++++ > proxmox-metrics/src/lib.rs | 89 +++++++++++++++++++ > 10 files changed, 411 insertions(+) > create mode 100644 proxmox-metrics/Cargo.toml > create mode 100644 proxmox-metrics/debian/changelog > create mode 100644 proxmox-metrics/debian/copyright > create mode 100644 proxmox-metrics/debian/debcargo.toml > create mode 100644 proxmox-metrics/src/influxdb/http.rs > create mode 100644 proxmox-metrics/src/influxdb/mod.rs > create mode 100644 proxmox-metrics/src/influxdb/udp.rs > create mode 100644 proxmox-metrics/src/influxdb/utils.rs > create mode 100644 proxmox-metrics/src/lib.rs > > diff --git a/Cargo.toml b/Cargo.toml > index 8f85e08..4a458d2 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -6,6 +6,7 @@ members = [ > "proxmox-http", > "proxmox-io", > "proxmox-lang", > + "proxmox-metrics", > "proxmox-router", > "proxmox-schema", > "proxmox-serde", > diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml > new file mode 100644 > index 0000000..9ac50fe > --- /dev/null > +++ b/proxmox-metrics/Cargo.toml > @@ -0,0 +1,20 @@ > +[package] > +name = "proxmox-metrics" > +version = "0.1.0" > +authors = ["Proxmox Support Team "] > +edition = "2018" > +license = "AGPL-3" > +description = "Metrics Server export utilitites" > + > +exclude = [ "debian" ] > + > +[dependencies] > +anyhow = "1.0" > +tokio = { version = "1.0", features = [ "net", "sync" ] } > +futures = "0.3" > +serde = "1.0" > +serde_json = "1.0" > +http = "0.2" > +hyper = "0.14" > +openssl = "0.10" > +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" } > diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog > new file mode 100644 > index 0000000..c02803b > --- /dev/null > +++ b/proxmox-metrics/debian/changelog > @@ -0,0 +1,5 @@ > +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium > + > + * initial package > + > + -- Proxmox Support Team Tue, 14 Dec 2021 08:56:54 +0100 > diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright > new file mode 100644 > index 0000000..5661ef6 > --- /dev/null > +++ b/proxmox-metrics/debian/copyright > @@ -0,0 +1,16 @@ > +Copyright (C) 2021 Proxmox Server Solutions GmbH > + > +This software is written by Proxmox Server Solutions GmbH > + > +This program is free software: you can redistribute it and/or modify > +it under the terms of the GNU Affero General Public License as published by > +the Free Software Foundation, either version 3 of the License, or > +(at your option) any later version. > + > +This program is distributed in the hope that it will be useful, > +but WITHOUT ANY WARRANTY; without even the implied warranty of > +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +GNU Affero General Public License for more details. > + > +You should have received a copy of the GNU Affero General Public License > +along with this program. If not, see . > diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml > new file mode 100644 > index 0000000..b7864cd > --- /dev/null > +++ b/proxmox-metrics/debian/debcargo.toml > @@ -0,0 +1,7 @@ > +overlay = "." > +crate_src_path = ".." > +maintainer = "Proxmox Support Team " > + > +[source] > +vcs_git = "git://git.proxmox.com/git/proxmox.git" > +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" > diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs > new file mode 100644 > index 0000000..df6c6a5 > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/http.rs > @@ -0,0 +1,122 @@ > +use std::sync::Arc; > + > +use anyhow::{bail, Error}; > +use hyper::Body; > +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; > +use tokio::sync::mpsc; > + > +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions}; > + > +use crate::influxdb::utils; > +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; > + > +pub struct InfluxDbHttp { > + client: SimpleHttp, > + _healthuri: http::Uri, > + writeuri: http::Uri, > + token: Option, > + max_body_size: usize, > + data: String, > + channel: mpsc::Receiver>, > +} > + > +impl InfluxDbHttp { > + pub fn new( > + https: bool, > + host: &str, > + port: u16, > + organization: &str, > + bucket: &str, > + token: Option<&str>, > + verify_tls: bool, > + max_body_size: usize, > + ) -> Result<(MetricsServerFuture, MetricsChannel), Error> { > + let (tx, rx) = mpsc::channel(1024); > + > + let client = if verify_tls { > + SimpleHttp::with_options(SimpleHttpOptions::default()) > + } else { > + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap(); > + ssl_connector.set_verify(SslVerifyMode::NONE); > + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default()) > + }; > + > + let authority = proxmox_http::uri::build_authority(host, port)?; > + > + let writeuri = http::uri::Builder::new() > + .scheme(if https { "https" } else { "http" }) > + .authority(authority.clone()) > + .path_and_query(format!( > + "/api/v2/write?org={}&bucket={}", > + organization, bucket > + )) > + .build()?; > + > + let healthuri = http::uri::Builder::new() > + .scheme(if https { "https" } else { "http" }) > + .authority(authority) > + .path_and_query("/health") > + .build()?; > + > + let this = Self { > + client, > + writeuri, > + _healthuri: healthuri, > + token: token.map(String::from), > + max_body_size, > + data: String::new(), > + channel: rx, > + }; > + > + let future = Box::pin(this.finish()); > + let channel = MetricsChannel { channel: tx }; > + Ok((future, channel)) > + } > + > + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { > + let new_data = utils::format_influxdb_line(&data)?; > + > + if self.data.len() + new_data.len() >= self.max_body_size { > + self.flush().await?; > + } > + > + self.data.push_str(&new_data); > + > + if self.data.len() >= self.max_body_size { > + self.flush().await?; > + } > + > + Ok(()) > + } > + > + pub async fn flush(&mut self) -> Result<(), Error> { > + if self.data.is_empty() { > + return Ok(()); > + } > + let mut request = http::Request::builder().method("POST").uri(&self.writeuri); > + > + if let Some(token) = &self.token { > + request = request.header("Authorization", format!("Token {}", token)); > + } > + > + let request = request.body(Body::from(self.data.split_off(0)))?; > + > + let res = self.client.request(request).await?; > + > + let status = res.status(); > + if !status.is_success() { > + bail!("got bad status: {}", status); > + } > + Ok(()) > + } > + > + async fn finish(mut self) -> Result<(), Error> { > + while let Some(data) = self.channel.recv().await { > + self.add_data(data).await?; > + } > + > + self.flush().await?; > + > + Ok(()) > + } > +} > diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs > new file mode 100644 > index 0000000..26a715c > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/mod.rs > @@ -0,0 +1,7 @@ > +mod http; > +pub use self::http::*; > + > +mod udp; > +pub use udp::*; > + > +pub mod utils; > diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs > new file mode 100644 > index 0000000..c4187a6 > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/udp.rs > @@ -0,0 +1,94 @@ > +use std::sync::Arc; > + > +use anyhow::Error; > +use tokio::net::UdpSocket; > +use tokio::sync::mpsc; > + > +use crate::influxdb::utils; > +use crate::{MetricsChannel, MetricsData, MetricsServerFuture}; > + > +pub struct InfluxDbUdp { > + address: String, > + conn: Option, > + ipv6: bool, > + mtu: u16, > + data: String, > + channel: mpsc::Receiver>, > +} > + > +impl InfluxDbUdp { > + pub fn new(host: &str, port: u16, mtu: Option) -> (MetricsServerFuture, MetricsChannel) { ^ Also this is a weird api, it would probably make more sense to just have an `influx_db_udp()` function in this case, similar to how `mpsc::channel()` is a function returning `(mpsc::Sender, mpsc::Receiver)`, we'd have metrics::influx_db_udp() -> Matrics; metrics::influx_db_http() -> Matrics; And yet another IPv6 thing... Do we *really* want to specify host and port separately? Why not have 1 string and use `ToSocketAddrs`? > + let (tx, rx) = mpsc::channel(1024); > + let ipv6 = host.contains(':'); The reason I'm asking is because the above works for ipv6 addresses... but... the schema you introduce in PBS 2/6 uses DNS_NAME_OR_IP_SCHEMA for the host, so the hostname can be, you know, a *name*... which means you don't actually *know* whether it's IPv4 or IPv6... > + > + let address = if ipv6 && host.len() > 3 && &host[0..1] != "[" { > + format!("[{}]:{}", host, port) > + } else { > + format!("{}:{}", host, port) > + }; > + > + let this = Self { > + address, > + ipv6, > + conn: None, > + // empty ipv6 udp package needs 48 bytes, subtract 50 for safety > + mtu: mtu.unwrap_or(1500) - 50, > + data: String::new(), > + channel: rx, > + }; > + > + let future = Box::pin(this.finish()); > + > + let channel = MetricsChannel { channel: tx }; > + > + (future, channel) > + } > + > + async fn connect(&mut self) -> Result { Meaning this should actually iterate through the results of tokio::net::lookup_host() and do both bind() and connect() for each result one after the other until they succeed... > + let conn = if self.ipv6 { > + UdpSocket::bind("[::]:0").await? > + } else { > + UdpSocket::bind("0.0.0.0:0").await? > + }; > + let addr = self.address.clone(); > + conn.connect(addr).await?; > + Ok(conn) > + } > + > + async fn add_data(&mut self, data: Arc) -> Result<(), Error> { > + let new_data = utils::format_influxdb_line(&data)?; > + > + if self.data.len() + new_data.len() >= (self.mtu as usize) { > + self.flush().await?; > + } > + > + self.data.push_str(&new_data); > + > + if self.data.len() >= (self.mtu as usize) { > + self.flush().await?; > + } > + > + Ok(()) > + } > + > + async fn flush(&mut self) -> Result<(), Error> { > + let conn = match self.conn.take() { > + Some(conn) => conn, > + None => self.connect().await?, > + }; > + > + conn.send(self.data.split_off(0).as_bytes()).await?; > + self.conn = Some(conn); > + Ok(()) > + } > + > + async fn finish(mut self) -> Result<(), Error> { > + while let Some(data) = self.channel.recv().await { > + self.add_data(data).await?; > + } > + > + self.flush().await?; > + > + Ok(()) > + } > +} > diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs > new file mode 100644 > index 0000000..3507e61 > --- /dev/null > +++ b/proxmox-metrics/src/influxdb/utils.rs > @@ -0,0 +1,50 @@ > +use std::fmt::Write; > + > +use anyhow::{bail, Error}; > +use serde_json::Value; > + > +use crate::MetricsData; > + > +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result { > + if !data.values.is_object() { > + bail!("invalid data"); > + } > + > + let mut line = escape_measurement(&data.measurement); > + > + for (key, value) in &data.tags { > + write!(line, ",{}={}", escape_key(key), escape_key(value))?; > + } > + > + line.push(' '); > + > + let mut first = true; > + for (key, value) in data.values.as_object().unwrap().iter() { > + match value { > + Value::Object(_) => bail!("objects not supported"), > + Value::Array(_) => bail!("arrays not supported"), > + _ => {} > + } > + if !first { > + line.push(','); > + } > + first = false; > + write!(line, "{}={}", escape_key(key), value.to_string())?; > + } > + > + // nanosecond precision > + writeln!(line, " {}", data.ctime * 1_000_000_000)?; > + > + Ok(line) > +} > + > +fn escape_key(key: &str) -> String { > + let key = key.replace(',', "\\,"); > + let key = key.replace('=', "\\="); > + key.replace(' ', "\\ ") > +} > + > +fn escape_measurement(measurement: &str) -> String { > + let measurement = measurement.replace(',', "\\,"); > + measurement.replace(' ', "\\ ") > +} > diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs > new file mode 100644 > index 0000000..ba018fb > --- /dev/null > +++ b/proxmox-metrics/src/lib.rs > @@ -0,0 +1,89 @@ > +use std::collections::HashMap; > +use std::pin::Pin; > +use std::sync::Arc; > + > +use anyhow::{format_err, Error}; > +use serde::Serialize; > +use serde_json::Value; > +use tokio::sync::mpsc; > + > +pub mod influxdb; > + > +#[derive(Clone)] > +/// Structured data for the metric server > +pub struct MetricsData { > + /// The category of measurements > + pub measurement: String, > + /// A list of to attach to the measurements > + pub tags: HashMap, > + /// The actual values to send. Only plain (not-nested) objects are supported at the moment. > + pub values: Value, > + /// The time of the measurement > + pub ctime: i64, > +} > + > +impl MetricsData { > + /// Convenient helper to create from references > + pub fn new( > + measurement: &str, > + tags: &[(&str, &str)], > + ctime: i64, > + values: V, > + ) -> Result { > + let mut new_tags = HashMap::new(); > + for (key, value) in tags { > + new_tags.insert(key.to_string(), value.to_string()); > + } > + > + Ok(Self { > + measurement: measurement.to_string(), > + tags: new_tags, > + values: serde_json::to_value(values)?, > + ctime, > + }) > + } > +} > + > +pub type MetricsServerFuture = > + Pin> + Send + 'static>>; > + > +#[derive(Clone)] > +/// A channel to send data to the metric server > +pub struct MetricsChannel { > + pub(crate) channel: mpsc::Sender>, > +} > + > +impl MetricsChannel { > + /// Queues the given data to the metric server > + pub async fn send_data(&self, data: Arc) -> Result<(), Error> { > + // return ok if we got no data to send > + if let Value::Object(map) = &data.values { > + if map.is_empty() { > + return Ok(()); > + } > + } > + self.channel > + .send(data) > + .await > + .map_err(|_| format_err!("receiver side closed"))?; > + Ok(()) > + } > +} > + > +/// Helper to send a list of MetricsData to a list of MetricsChannels > +pub async fn send_data_to_channels( > + values: &[Arc], > + channels: &[MetricsChannel], > +) -> Vec> { > + let mut futures = Vec::with_capacity(channels.len()); > + for channel in channels { > + futures.push(async move { > + for data in values { > + channel.send_data(Arc::clone(data)).await? > + } > + Ok::<(), Error>(()) > + }); > + } > + > + futures::future::join_all(futures).await > +} > -- > 2.30.2