public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability
@ 2021-12-14 12:24 Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
                   ` (8 more replies)
  0 siblings, 9 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

this series adds support for exporting metrics data to external
metric servers.

for now this includes only data we gather for RRD, though it should
not be hard to extend that functionality

also only influxdb (udp/http(s)) is currently supported, but it should
also not be too hard to include more options here

i did not include gui/cli patches yet, as i find the
proxmox-backup-manager options are already too much and i waited for
the gui for some feedback.

for testing, the metric servers can be added either by
calling 'proxmox-backup debug api ...' or by manually editing the
file

a bit unrelated: i moved the 'disk_usage' method to proxmox-sys
(fits better there, and we want to use that from other things too)

ofc, proxmox-backup depends on bumped versions of the proxmox-* crates

proxmox:

Dominik Csapak (3):
  proxmox-sys: make some structs serializable
  proxmox-sys: add DiskUsage struct and helper
  proxmox-metrics: implement metrics server client code

 Cargo.toml                            |   1 +
 proxmox-metrics/Cargo.toml            |  20 ++++
 proxmox-metrics/debian/changelog      |   5 +
 proxmox-metrics/debian/copyright      |  16 +++
 proxmox-metrics/debian/debcargo.toml  |   7 ++
 proxmox-metrics/src/influxdb/http.rs  | 143 ++++++++++++++++++++++++++
 proxmox-metrics/src/influxdb/mod.rs   |   7 ++
 proxmox-metrics/src/influxdb/udp.rs   | 107 +++++++++++++++++++
 proxmox-metrics/src/influxdb/utils.rs |  51 +++++++++
 proxmox-metrics/src/lib.rs            |  92 +++++++++++++++++
 proxmox-sys/Cargo.toml                |   1 +
 proxmox-sys/src/fs/mod.rs             |  26 +++++
 proxmox-sys/src/linux/procfs/mod.rs   |   7 +-
 13 files changed, 480 insertions(+), 3 deletions(-)
 create mode 100644 proxmox-metrics/Cargo.toml
 create mode 100644 proxmox-metrics/debian/changelog
 create mode 100644 proxmox-metrics/debian/copyright
 create mode 100644 proxmox-metrics/debian/debcargo.toml
 create mode 100644 proxmox-metrics/src/influxdb/http.rs
 create mode 100644 proxmox-metrics/src/influxdb/mod.rs
 create mode 100644 proxmox-metrics/src/influxdb/udp.rs
 create mode 100644 proxmox-metrics/src/influxdb/utils.rs
 create mode 100644 proxmox-metrics/src/lib.rs

proxmox-backup:

Dominik Csapak (6):
  use 'disk_usage' from proxmox-sys
  pbs-api-types: add metrics api types
  pbs-config: add metrics config class
  backup-proxy: decouple stats gathering from rrd update
  proxmox-backup-proxy: send metrics to configured metrics server
  api: add metricserver endpoints

 Cargo.toml                                   |   1 +
 pbs-api-types/src/lib.rs                     |   2 +
 pbs-api-types/src/metrics.rs                 | 134 +++++++
 pbs-config/Cargo.toml                        |   1 +
 pbs-config/src/lib.rs                        |   1 +
 pbs-config/src/metrics.rs                    | 122 +++++++
 src/api2/admin/datastore.rs                  |   4 +-
 src/api2/config/metricserver/influxdbhttp.rs | 272 +++++++++++++++
 src/api2/config/metricserver/influxdbudp.rs  | 242 +++++++++++++
 src/api2/config/metricserver/mod.rs          |  16 +
 src/api2/config/mod.rs                       |   2 +
 src/api2/node/status.rs                      |  10 +-
 src/api2/status.rs                           |   4 +-
 src/bin/proxmox-backup-proxy.rs              | 348 +++++++++++++++----
 src/tools/disks/mod.rs                       |  21 +-
 15 files changed, 1083 insertions(+), 97 deletions(-)
 create mode 100644 pbs-api-types/src/metrics.rs
 create mode 100644 pbs-config/src/metrics.rs
 create mode 100644 src/api2/config/metricserver/influxdbhttp.rs
 create mode 100644 src/api2/config/metricserver/influxdbudp.rs
 create mode 100644 src/api2/config/metricserver/mod.rs

-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper Dominik Csapak
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

we already depend on serde anyway, and this makes gathering structs a
bit more comfortable

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-sys/Cargo.toml              | 1 +
 proxmox-sys/src/linux/procfs/mod.rs | 7 ++++---
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/proxmox-sys/Cargo.toml b/proxmox-sys/Cargo.toml
index 80d39eb..bfcd388 100644
--- a/proxmox-sys/Cargo.toml
+++ b/proxmox-sys/Cargo.toml
@@ -17,6 +17,7 @@ log = "0.4"
 nix = "0.19.1"
 regex = "1.2"
 serde_json = "1.0"
+serde = { version = "1.0", features = [ "derive" ] }
 zstd = { version = "0.6", features = [ "bindgen" ] }
 
 # Macro crates:
diff --git a/proxmox-sys/src/linux/procfs/mod.rs b/proxmox-sys/src/linux/procfs/mod.rs
index 30b9978..3373dec 100644
--- a/proxmox-sys/src/linux/procfs/mod.rs
+++ b/proxmox-sys/src/linux/procfs/mod.rs
@@ -11,6 +11,7 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 use lazy_static::lazy_static;
 use nix::unistd::Pid;
+use serde::Serialize;
 
 use crate::fs::file_read_firstline;
 
@@ -184,7 +185,7 @@ pub fn read_proc_uptime_ticks() -> Result<(u64, u64), Error> {
     Ok((up as u64, idle as u64))
 }
 
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Serialize)]
 /// The CPU fields from `/proc/stat` with their native time value. Multiply
 /// with CLOCK_TICKS to get the real value.
 pub struct ProcFsStat {
@@ -407,7 +408,7 @@ fn test_read_proc_stat() {
     assert_eq!(stat.iowait_percent, 0.0);
 }
 
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
 pub struct ProcFsMemInfo {
     pub memtotal: u64,
     pub memfree: u64,
@@ -539,7 +540,7 @@ pub fn read_memory_usage() -> Result<ProcFsMemUsage, Error> {
     }
 }
 
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
 pub struct ProcFsNetDev {
     pub device: String,
     pub receive: u64,
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

copied from proxmox-backup

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox-sys/src/fs/mod.rs | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/proxmox-sys/src/fs/mod.rs b/proxmox-sys/src/fs/mod.rs
index 9fe333b..5935b1a 100644
--- a/proxmox-sys/src/fs/mod.rs
+++ b/proxmox-sys/src/fs/mod.rs
@@ -3,6 +3,7 @@ use std::fs::File;
 use std::path::Path;
 
 use anyhow::{bail, Error};
+use serde::Serialize;
 
 use std::os::unix::io::{AsRawFd, RawFd};
 use nix::unistd::{Gid, Uid};
@@ -102,3 +103,28 @@ impl CreateOptions {
     */
 }
 
+/// Basic disk usage information
+#[derive(Serialize)]
+pub struct DiskUsage {
+    pub total: u64,
+    pub used: u64,
+    pub available: u64,
+}
+
+/// Get disk usage information from path
+pub fn disk_usage(path: &std::path::Path) -> Result<DiskUsage, Error> {
+    let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() };
+
+    use nix::NixPath;
+
+    let res = path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?;
+    nix::errno::Errno::result(res)?;
+
+    let bsize = stat.f_bsize as u64;
+
+    Ok(DiskUsage{
+        total: stat.f_blocks*bsize,
+        used: (stat.f_blocks-stat.f_bfree)*bsize,
+        available: stat.f_bavail*bsize,
+    })
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 13:51   ` Wolfgang Bumiller
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys Dominik Csapak
                   ` (5 subsequent siblings)
  8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

influxdb (udp + http(s)) only for now

general architecture looks as follows:

"new" returns a MetricsChannel and a Future
the channels can be used to push data in (it flushes automatically if
it would be over the configured size (mtu/max_body_size))

and the future must be polled to actually send data to the servers.

so most often it would look like this:
  let (future, channel) = InfluxDbHttp::new(..params..)?;
  let handle = tokio::spawn(future);
  channel.send_data(...).await?;
  handle.await?;

when all channels go out of scope, all remaining data in the channel
will be read and sent to the server

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 Cargo.toml                            |   1 +
 proxmox-metrics/Cargo.toml            |  20 ++++
 proxmox-metrics/debian/changelog      |   5 +
 proxmox-metrics/debian/copyright      |  16 +++
 proxmox-metrics/debian/debcargo.toml  |   7 ++
 proxmox-metrics/src/influxdb/http.rs  | 143 ++++++++++++++++++++++++++
 proxmox-metrics/src/influxdb/mod.rs   |   7 ++
 proxmox-metrics/src/influxdb/udp.rs   | 107 +++++++++++++++++++
 proxmox-metrics/src/influxdb/utils.rs |  51 +++++++++
 proxmox-metrics/src/lib.rs            |  92 +++++++++++++++++
 10 files changed, 449 insertions(+)
 create mode 100644 proxmox-metrics/Cargo.toml
 create mode 100644 proxmox-metrics/debian/changelog
 create mode 100644 proxmox-metrics/debian/copyright
 create mode 100644 proxmox-metrics/debian/debcargo.toml
 create mode 100644 proxmox-metrics/src/influxdb/http.rs
 create mode 100644 proxmox-metrics/src/influxdb/mod.rs
 create mode 100644 proxmox-metrics/src/influxdb/udp.rs
 create mode 100644 proxmox-metrics/src/influxdb/utils.rs
 create mode 100644 proxmox-metrics/src/lib.rs

diff --git a/Cargo.toml b/Cargo.toml
index 8f85e08..4a458d2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,7 @@ members = [
     "proxmox-http",
     "proxmox-io",
     "proxmox-lang",
+    "proxmox-metrics",
     "proxmox-router",
     "proxmox-schema",
     "proxmox-serde",
diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
new file mode 100644
index 0000000..9ac50fe
--- /dev/null
+++ b/proxmox-metrics/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox-metrics"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+license = "AGPL-3"
+description = "Metrics Server export utilitites"
+
+exclude = [ "debian" ]
+
+[dependencies]
+anyhow = "1.0"
+tokio = { version = "1.0", features = [ "net", "sync" ] }
+futures = "0.3"
+serde = "1.0"
+serde_json = "1.0"
+http = "0.2"
+hyper = "0.14"
+openssl = "0.10"
+proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
new file mode 100644
index 0000000..c02803b
--- /dev/null
+++ b/proxmox-metrics/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
+
+  * initial package
+
+ -- Proxmox Support Team <support@proxmox.com>  Tue, 14 Dec 2021 08:56:54 +0100
diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
new file mode 100644
index 0000000..5661ef6
--- /dev/null
+++ b/proxmox-metrics/debian/copyright
@@ -0,0 +1,16 @@
+Copyright (C) 2021 Proxmox Server Solutions GmbH
+
+This software is written by Proxmox Server Solutions GmbH <support@proxmox.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
new file mode 100644
index 0000000..b7864cd
--- /dev/null
+++ b/proxmox-metrics/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
new file mode 100644
index 0000000..8f1157d
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/http.rs
@@ -0,0 +1,143 @@
+use std::sync::Arc;
+
+use anyhow::{bail, Error};
+use futures::{future::FutureExt, select};
+use hyper::Body;
+use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
+use tokio::sync::mpsc;
+
+use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbHttp {
+    client: SimpleHttp,
+    _healthuri: http::Uri,
+    writeuri: http::Uri,
+    token: Option<String>,
+    max_body_size: usize,
+    data: String,
+    data_channel: mpsc::Receiver<Arc<MetricsData>>,
+    flush_channel: mpsc::Receiver<()>,
+}
+
+impl InfluxDbHttp {
+    pub fn new(
+        https: bool,
+        host: &str,
+        port: u16,
+        organization: &str,
+        bucket: &str,
+        token: Option<&str>,
+        verify_tls: bool,
+        max_body_size: usize,
+    ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
+        let (data_tx, data_rx) = mpsc::channel(1024);
+        let (flush_tx, flush_rx) = mpsc::channel(1);
+
+        let client = if verify_tls {
+            SimpleHttp::with_options(SimpleHttpOptions::default())
+        } else {
+            let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
+            ssl_connector.set_verify(SslVerifyMode::NONE);
+            SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
+        };
+
+        let authority = proxmox_http::uri::build_authority(host, port)?;
+
+        let writeuri = http::uri::Builder::new()
+            .scheme(if https { "https" } else { "http" })
+            .authority(authority.clone())
+            .path_and_query(format!(
+                "/api/v2/write?org={}&bucket={}",
+                organization, bucket
+            ))
+            .build()?;
+
+        let healthuri = http::uri::Builder::new()
+            .scheme(if https { "https" } else { "http" })
+            .authority(authority)
+            .path_and_query("/health")
+            .build()?;
+
+        let this = Self {
+            client,
+            writeuri,
+            _healthuri: healthuri,
+            token: token.map(String::from),
+            max_body_size,
+            data: String::new(),
+            data_channel: data_rx,
+            flush_channel: flush_rx,
+        };
+
+        let future = Box::pin(this.finish());
+        let channel = MetricsChannel {
+            data_channel: data_tx,
+            flush_channel: flush_tx,
+        };
+        Ok((future, channel))
+    }
+
+    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+        let new_data = utils::format_influxdb_line(&data)?;
+
+        if self.data.len() + new_data.len() >= self.max_body_size {
+            self.flush().await?;
+        }
+
+        self.data.push_str(&new_data);
+
+        if self.data.len() >= self.max_body_size {
+            self.flush().await?;
+        }
+
+        Ok(())
+    }
+
+    pub async fn flush(&mut self) -> Result<(), Error> {
+        if self.data.is_empty() {
+            return Ok(());
+        }
+        let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
+
+        if let Some(token) = &self.token {
+            request = request.header("Authorization", format!("Token {}", token));
+        }
+
+        let request = request.body(Body::from(self.data.split_off(0)))?;
+
+        let res = self.client.request(request).await?;
+
+        let status = res.status();
+        if !status.is_success() {
+            bail!("got bad status: {}", status);
+        }
+        Ok(())
+    }
+
+    async fn finish(mut self) -> Result<(), Error> {
+        loop {
+            select! {
+                res = self.flush_channel.recv().fuse() => match res {
+                    Some(_) => self.flush().await?,
+                    None => break, // all senders gone
+                },
+                data = self.data_channel.recv().fuse() => match data {
+                    Some(data) => self.add_data(data).await?,
+                    None => break, // all senders gone
+                },
+            }
+        }
+
+        // consume remaining data in channel
+        while let Some(data) = self.data_channel.recv().await {
+            self.add_data(data).await?;
+        }
+
+        self.flush().await?;
+
+        Ok(())
+    }
+}
diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
new file mode 100644
index 0000000..26a715c
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/mod.rs
@@ -0,0 +1,7 @@
+mod http;
+pub use self::http::*;
+
+mod udp;
+pub use udp::*;
+
+pub mod utils;
diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
new file mode 100644
index 0000000..de2b0d5
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/udp.rs
@@ -0,0 +1,107 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use futures::{future::FutureExt, select};
+use tokio::net::UdpSocket;
+use tokio::sync::mpsc;
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbUdp {
+    address: String,
+    conn: Option<tokio::net::UdpSocket>,
+    mtu: u16,
+    data: String,
+    data_channel: mpsc::Receiver<Arc<MetricsData>>,
+    flush_channel: mpsc::Receiver<()>,
+}
+
+impl InfluxDbUdp {
+    pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
+        let (data_tx, data_rx) = mpsc::channel(1024);
+        let (flush_tx, flush_rx) = mpsc::channel(1);
+
+        let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" {
+            format!("[{}]:{}", host, port)
+        } else {
+            format!("{}:{}", host, port)
+        };
+
+        let this = Self {
+            address,
+            conn: None,
+            mtu: mtu.unwrap_or(1500),
+            data: String::new(),
+            data_channel: data_rx,
+            flush_channel: flush_rx,
+        };
+
+        let future = Box::pin(this.finish());
+
+        let channel = MetricsChannel {
+            data_channel: data_tx,
+            flush_channel: flush_tx,
+        };
+
+        (future, channel)
+    }
+
+    async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
+        let conn = UdpSocket::bind("0.0.0.0:0").await?;
+        let addr = self.address.clone();
+        conn.connect(addr).await?;
+        Ok(conn)
+    }
+
+    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+        let new_data = utils::format_influxdb_line(&data)?;
+
+        if self.data.len() + new_data.len() >= (self.mtu as usize) {
+            self.flush().await?;
+        }
+
+        self.data.push_str(&new_data);
+
+        if self.data.len() >= (self.mtu as usize) {
+            self.flush().await?;
+        }
+
+        Ok(())
+    }
+
+    async fn flush(&mut self) -> Result<(), Error> {
+        let conn = match self.conn.take() {
+            Some(conn) => conn,
+            None => self.connect().await?,
+        };
+
+        conn.send(self.data.split_off(0).as_bytes()).await?;
+        self.conn = Some(conn);
+        Ok(())
+    }
+
+    async fn finish(mut self) -> Result<(), Error> {
+        loop {
+            select! {
+                res = self.flush_channel.recv().fuse() => match res {
+                    Some(_) => self.flush().await?,
+                    None => break, // all senders gone
+                },
+                data = self.data_channel.recv().fuse() => match data {
+                    Some(data) => self.add_data(data).await?,
+                    None => break, // all senders gone
+                },
+            }
+        }
+
+        // consume remaining data in channel
+        while let Some(data) = self.data_channel.recv().await {
+            self.add_data(data).await?;
+        }
+
+        self.flush().await?;
+
+        Ok(())
+    }
+}
diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
new file mode 100644
index 0000000..bf391f9
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/utils.rs
@@ -0,0 +1,51 @@
+use anyhow::{bail, Error};
+
+use crate::MetricsData;
+
+pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
+    if !data.values.is_object() {
+        bail!("invalid data");
+    }
+
+    let mut line = escape_measurement(&data.measurement);
+    line.push(',');
+
+    let tags = data.tags.iter().map(|(key, value)| {
+        format!("{}={}", escape_key(&key), escape_key(&value))
+    });
+    line.push_str(&tags.collect::<Vec<String>>().join(","));
+
+    line.push(' ');
+
+    let values = data.values.as_object().unwrap().iter().map(|(key, value)| {
+        let value = if value.is_string() {
+             escape_value(&value.to_string())
+        } else {
+            value.to_string()
+        };
+        format!("{}={}", escape_key(&key), value)
+    });
+
+    line.push_str(&values.collect::<Vec<String>>().join(","));
+
+    // nanosecond precision
+    line.push_str(&format!(" {}\n", data.ctime*1_000_000_000));
+    Ok(line)
+}
+
+fn escape_key(key: &str) -> String {
+    let key = key.replace(',', "\\,");
+    let key = key.replace('=', "\\=");
+    let key = key.replace(' ', "\\ ");
+    key
+}
+
+fn escape_measurement(measurement: &str) -> String {
+    let measurement = measurement.replace(',', "\\,");
+    let measurement = measurement.replace(' ', "\\ ");
+    measurement
+}
+
+fn escape_value(value: &str) -> String {
+    format!("\"{}\"",value.replace('"', "\\\""))
+}
diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
new file mode 100644
index 0000000..0a76faa
--- /dev/null
+++ b/proxmox-metrics/src/lib.rs
@@ -0,0 +1,92 @@
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Error};
+use serde::Serialize;
+use serde_json::Value;
+use tokio::sync::mpsc;
+
+pub mod influxdb;
+
+#[derive(Clone)]
+/// Structured data for the metric server
+pub struct MetricsData {
+    /// The category of measurements
+    pub measurement: String,
+    /// A list of to attach to the measurements
+    pub tags: HashMap<String, String>,
+    /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
+    pub values: Value,
+    /// The time of the measurement
+    pub ctime: i64,
+}
+
+impl MetricsData {
+    /// Convenient helper to create from references
+    pub fn new<V: Serialize>(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result<Self, Error> {
+        let mut new_tags = HashMap::new();
+        for (key, value) in tags {
+            new_tags.insert(key.to_string(), value.to_string());
+        }
+
+        Ok(Self{
+            measurement: measurement.to_string(),
+            tags: new_tags,
+            values: serde_json::to_value(values)?,
+            ctime,
+        })
+    }
+}
+
+pub type MetricsServerFuture =
+    Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
+
+#[derive(Clone)]
+/// A channel to send data to the metric server
+pub struct MetricsChannel {
+    pub(crate) data_channel: mpsc::Sender<Arc<MetricsData>>,
+    pub(crate) flush_channel: mpsc::Sender<()>,
+}
+
+impl MetricsChannel {
+    /// Queues the given data for the metric server. If the queue is full,
+    /// flush and try again.
+    pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
+        if let Err(err) = self.data_channel.try_send(data) {
+            match err {
+                mpsc::error::TrySendError::Full(data) => {
+                    self.flush_channel.send(()).await?;
+                    self.data_channel
+                        .send(data)
+                        .await
+                        .map_err(|_| format_err!("error sending data"))?;
+                }
+                mpsc::error::TrySendError::Closed(_) => {
+                    bail!("channel closed");
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Flush data to the metric server
+    pub async fn flush(&self) -> Result<(), Error> {
+        self.flush_channel.send(()).await?;
+        Ok(())
+    }
+}
+
+pub async fn send_data_to_channels(values: &[Arc<MetricsData>], channels: &[MetricsChannel]) -> Vec<Result<(), Error>> {
+    let mut futures = Vec::with_capacity(channels.len());
+    for channel in channels {
+        futures.push(async move {
+            for data in values.into_iter() {
+                channel.send_data(data.clone()).await?
+            }
+            Ok::<(), Error>(())
+        });
+    }
+
+    futures::future::join_all(futures).await
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (2 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types Dominik Csapak
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/admin/datastore.rs     |  4 ++--
 src/api2/node/status.rs         | 10 ++++++++--
 src/api2/status.rs              |  4 ++--
 src/bin/proxmox-backup-proxy.rs |  2 +-
 src/tools/disks/mod.rs          | 21 +--------------------
 5 files changed, 14 insertions(+), 27 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index b653f906..0d0b91f1 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -612,7 +612,7 @@ pub fn status(
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<DataStoreStatus, Error> {
     let datastore = DataStore::lookup_datastore(&store)?;
-    let storage = crate::tools::disks::disk_usage(&datastore.base_path())?;
+    let storage = proxmox_sys::fs::disk_usage(&datastore.base_path())?;
     let (counts, gc_status) = if verbose {
         let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
         let user_info = CachedUserInfo::new()?;
@@ -635,7 +635,7 @@ pub fn status(
     Ok(DataStoreStatus {
         total: storage.total,
         used: storage.used,
-        avail: storage.avail,
+        avail: storage.available,
         gc_status,
         counts,
     })
diff --git a/src/api2/node/status.rs b/src/api2/node/status.rs
index 9559dda6..7861d3a5 100644
--- a/src/api2/node/status.rs
+++ b/src/api2/node/status.rs
@@ -9,7 +9,7 @@ use proxmox_sys::linux::procfs;
 use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
 use proxmox_schema::api;
 
-use pbs_api_types::{NODE_SCHEMA, NodePowerCommand, PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT};
+use pbs_api_types::{NODE_SCHEMA, NodePowerCommand, PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT, StorageStatus};
 
 use crate::api2::types::{
     NodeCpuInformation, NodeStatus, NodeMemoryCounters, NodeSwapCounters, NodeInformation,
@@ -77,10 +77,16 @@ fn get_status(
         uname.version()
     );
 
+    let disk = proxmox_sys::fs::disk_usage(Path::new("/"))?;
+
     Ok(NodeStatus {
         memory,
         swap,
-        root: crate::tools::disks::disk_usage(Path::new("/"))?,
+        root: StorageStatus {
+            total: disk.total,
+            used: disk.used,
+            avail: disk.available,
+        },
         uptime: procfs::read_proc_uptime()?.0 as u64,
         loadavg,
         kversion,
diff --git a/src/api2/status.rs b/src/api2/status.rs
index 7f50914b..48c283e6 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -110,13 +110,13 @@ pub fn datastore_status(
                 continue;
             }
         };
-        let status = crate::tools::disks::disk_usage(&datastore.base_path())?;
+        let status = proxmox_sys::fs::disk_usage(&datastore.base_path())?;
 
         let mut entry = json!({
             "store": store,
             "total": status.total,
             "used": status.used,
-            "avail": status.avail,
+            "avail": status.available,
             "gc-status": datastore.last_gc_status(),
         });
 
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 07a53687..fa79322d 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -1055,7 +1055,7 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
 
 fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
 
-    match proxmox_backup::tools::disks::disk_usage(path) {
+    match proxmox_sys::fs::disk_usage(path) {
         Ok(status) => {
             let rrd_key = format!("{}/total", rrd_prefix);
             rrd_update_gauge(&rrd_key, status.total as f64);
diff --git a/src/tools/disks/mod.rs b/src/tools/disks/mod.rs
index 867aa624..30d9568f 100644
--- a/src/tools/disks/mod.rs
+++ b/src/tools/disks/mod.rs
@@ -19,7 +19,7 @@ use proxmox_sys::linux::procfs::{MountInfo, mountinfo::Device};
 use proxmox_sys::{io_bail, io_format_err};
 use proxmox_schema::api;
 
-use pbs_api_types::{BLOCKDEVICE_NAME_REGEX, StorageStatus};
+use pbs_api_types::BLOCKDEVICE_NAME_REGEX;
 
 mod zfs;
 pub use zfs::*;
@@ -521,25 +521,6 @@ impl Disk {
     }
 }
 
-/// Returns disk usage information (total, used, avail)
-pub fn disk_usage(path: &std::path::Path) -> Result<StorageStatus, Error> {
-
-    let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() };
-
-    use nix::NixPath;
-
-    let res = path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?;
-    nix::errno::Errno::result(res)?;
-
-    let bsize = stat.f_bsize as u64;
-
-    Ok(StorageStatus{
-        total: stat.f_blocks*bsize,
-        used: (stat.f_blocks-stat.f_bfree)*bsize,
-        avail: stat.f_bavail*bsize,
-    })
-}
-
 #[api()]
 #[derive(Debug, Serialize, Deserialize)]
 #[serde(rename_all="lowercase")]
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (3 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class Dominik Csapak
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

InfluxDbUdp and InfluxDbHttp for now

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 pbs-api-types/src/lib.rs     |   2 +
 pbs-api-types/src/metrics.rs | 134 +++++++++++++++++++++++++++++++++++
 2 files changed, 136 insertions(+)
 create mode 100644 pbs-api-types/src/metrics.rs

diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 0a0dd33d..09bd59c8 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -88,6 +88,8 @@ pub use traffic_control::*;
 mod zfs;
 pub use zfs::*;
 
+mod metrics;
+pub use metrics::*;
 
 #[rustfmt::skip]
 #[macro_use]
diff --git a/pbs-api-types/src/metrics.rs b/pbs-api-types/src/metrics.rs
new file mode 100644
index 00000000..c7e08885
--- /dev/null
+++ b/pbs-api-types/src/metrics.rs
@@ -0,0 +1,134 @@
+use serde::{Deserialize, Serialize};
+
+use crate::{DNS_NAME_OR_IP_SCHEMA, PROXMOX_SAFE_ID_FORMAT, SINGLE_LINE_COMMENT_SCHEMA};
+use proxmox_schema::{api, Schema, StringSchema, Updater};
+
+pub const METRIC_SERVER_ID_SCHEMA: Schema = StringSchema::new("Metrics Server ID.")
+    .format(&PROXMOX_SAFE_ID_FORMAT)
+    .min_length(3)
+    .max_length(32)
+    .schema();
+
+pub const INFLUXDB_BUCKET_SCHEMA: Schema = StringSchema::new("InfluxDB Bucket.")
+    .format(&PROXMOX_SAFE_ID_FORMAT)
+    .min_length(3)
+    .max_length(32)
+    .default("proxmox")
+    .schema();
+
+pub const INFLUXDB_ORGANIZATION_SCHEMA: Schema = StringSchema::new("InfluxDB Organization.")
+    .format(&PROXMOX_SAFE_ID_FORMAT)
+    .min_length(3)
+    .max_length(32)
+    .default("proxmox")
+    .schema();
+
+#[api(
+    properties: {
+        name: {
+            schema: METRIC_SERVER_ID_SCHEMA,
+        },
+        host: {
+            schema: DNS_NAME_OR_IP_SCHEMA,
+        },
+        port: {
+            description: "The port",
+            type: u16,
+        },
+        mtu: {
+            description: "The MTU",
+            type: u16,
+            optional: true,
+            default: 1500,
+        },
+        comment: {
+            optional: true,
+            schema: SINGLE_LINE_COMMENT_SCHEMA,
+        },
+    },
+)]
+#[derive(Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// InfluxDB Server (UDP)
+pub struct InfluxDbUdp {
+    #[updater(skip)]
+    pub name: String,
+    pub host: String,
+    pub port: u16,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub mtu: Option<u16>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub comment: Option<String>,
+}
+
+#[api(
+    properties: {
+        name: {
+            schema: METRIC_SERVER_ID_SCHEMA,
+        },
+        host: {
+            schema: DNS_NAME_OR_IP_SCHEMA,
+        },
+        https: {
+            description: "If true, HTTPS is used.",
+            type: bool,
+            optional: true,
+            default: true,
+        },
+        token: {
+            description: "The (optional) API token",
+            type: String,
+            optional: true,
+        },
+        bucket: {
+            schema: INFLUXDB_BUCKET_SCHEMA,
+            optional: true,
+        },
+        organization: {
+            schema: INFLUXDB_ORGANIZATION_SCHEMA,
+            optional: true,
+        },
+        "max-body-size": {
+            description: "The (optional) maximum body size",
+            type: usize,
+            optional: true,
+            default: 25_000_000,
+        },
+        "verify-tls": {
+            description: "If true, the certificate will be validated.",
+            type: bool,
+            optional: true,
+            default: true,
+        },
+        comment: {
+            optional: true,
+            schema: SINGLE_LINE_COMMENT_SCHEMA,
+        },
+    },
+)]
+#[derive(Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// InfluxDB Server (HTTP(s))
+pub struct InfluxDbHttp {
+    #[updater(skip)]
+    pub name: String,
+    pub host: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// The (optional) port. (defaults: 80 for HTTP, 443 for HTTPS)
+    pub port: Option<u16>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub https: Option<bool>,
+    /// The Optional Token
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub token: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub bucket: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub organization: Option<String>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub max_body_size: Option<usize>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub verify_tls: Option<bool>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub comment: Option<String>,
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (4 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update Dominik Csapak
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

a section config like in pve

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 Cargo.toml                |   1 +
 pbs-config/Cargo.toml     |   1 +
 pbs-config/src/lib.rs     |   1 +
 pbs-config/src/metrics.rs | 122 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 125 insertions(+)
 create mode 100644 pbs-config/src/metrics.rs

diff --git a/Cargo.toml b/Cargo.toml
index d7ad2085..1643b628 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,6 +96,7 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] }
 proxmox-http = { version = "0.6", features = [ "client", "http-helpers", "websocket" ] }
 proxmox-io = "1"
 proxmox-lang = "1"
+proxmox-metrics = "0.1"
 proxmox-router = { version = "1.1", features = [ "cli" ] }
 proxmox-schema = { version = "1", features = [ "api-macro" ] }
 proxmox-section-config = "1"
diff --git a/pbs-config/Cargo.toml b/pbs-config/Cargo.toml
index cd14d823..4c920712 100644
--- a/pbs-config/Cargo.toml
+++ b/pbs-config/Cargo.toml
@@ -25,6 +25,7 @@ proxmox-time = "1"
 proxmox-serde = "0.1"
 proxmox-shared-memory = "0.2"
 proxmox-sys = "0.2"
+proxmox-metrics = "0.1"
 
 pbs-api-types = { path = "../pbs-api-types" }
 pbs-buildcfg = { path = "../pbs-buildcfg" }
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 118030bc..29880ab9 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -6,6 +6,7 @@ pub mod domains;
 pub mod drive;
 pub mod key_config;
 pub mod media_pool;
+pub mod metrics;
 pub mod network;
 pub mod remote;
 pub mod sync;
diff --git a/pbs-config/src/metrics.rs b/pbs-config/src/metrics.rs
new file mode 100644
index 00000000..97f0cd17
--- /dev/null
+++ b/pbs-config/src/metrics.rs
@@ -0,0 +1,122 @@
+use std::collections::HashMap;
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+
+use proxmox_metrics::{influxdb, MetricsChannel, MetricsServerFuture};
+use proxmox_schema::*;
+use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+
+use pbs_api_types::{InfluxDbHttp, InfluxDbUdp, METRIC_SERVER_ID_SCHEMA};
+
+use crate::{open_backup_lockfile, BackupLockGuard};
+
+lazy_static! {
+    pub static ref CONFIG: SectionConfig = init();
+}
+
+fn init() -> SectionConfig {
+    let mut config = SectionConfig::new(&METRIC_SERVER_ID_SCHEMA);
+
+    let udp_schema = match InfluxDbUdp::API_SCHEMA {
+        Schema::Object(ref object_schema) => object_schema,
+        _ => unreachable!(),
+    };
+
+    let udp_plugin = SectionConfigPlugin::new(
+        "influxdb-udp".to_string(),
+        Some("name".to_string()),
+        udp_schema,
+    );
+    config.register_plugin(udp_plugin);
+
+    let http_schema = match InfluxDbHttp::API_SCHEMA {
+        Schema::Object(ref object_schema) => object_schema,
+        _ => unreachable!(),
+    };
+
+    let http_plugin = SectionConfigPlugin::new(
+        "influxdb-http".to_string(),
+        Some("name".to_string()),
+        http_schema,
+    );
+
+    config.register_plugin(http_plugin);
+
+    config
+}
+
+pub const METRIC_SERVER_CFG_FILENAME: &str = "/etc/proxmox-backup/metricserver.cfg";
+pub const METRIC_SERVER_CFG_LOCKFILE: &str = "/etc/proxmox-backup/.metricserver.lck";
+
+/// Get exclusive lock
+pub fn lock_config() -> Result<BackupLockGuard, Error> {
+    open_backup_lockfile(METRIC_SERVER_CFG_LOCKFILE, None, true)
+}
+
+pub fn config() -> Result<(SectionConfigData, [u8; 32]), Error> {
+    let content = proxmox_sys::fs::file_read_optional_string(METRIC_SERVER_CFG_FILENAME)?
+        .unwrap_or_else(|| "".to_string());
+
+    let digest = openssl::sha::sha256(content.as_bytes());
+    let data = CONFIG.parse(METRIC_SERVER_CFG_FILENAME, &content)?;
+    Ok((data, digest))
+}
+
+pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
+    let raw = CONFIG.write(METRIC_SERVER_CFG_FILENAME, &config)?;
+    crate::replace_backup_config(METRIC_SERVER_CFG_FILENAME, raw.as_bytes())
+}
+
+// shell completion helper
+pub fn complete_remote_name(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
+    match config() {
+        Ok((data, _digest)) => data.sections.iter().map(|(id, _)| id.to_string()).collect(),
+        Err(_) => return vec![],
+    }
+}
+
+/// Get the metric server connections from a config
+pub fn get_metric_server_connections(
+    metric_config: SectionConfigData,
+) -> Result<(Vec<MetricsServerFuture>, Vec<MetricsChannel>, Vec<String>), Error> {
+    let mut futures = Vec::new();
+    let mut channels = Vec::new();
+    let mut names = Vec::new();
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+    {
+        let (future, sender) = influxdb::InfluxDbUdp::new(&config.host, config.port, config.mtu);
+        names.push(config.name);
+        futures.push(future);
+        channels.push(sender);
+    }
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+    {
+        let port = config.port.unwrap_or_else(|| {
+            if config.https.unwrap_or(true) {
+                443
+            } else {
+                80
+            }
+        });
+
+        let (future, sender) = influxdb::InfluxDbHttp::new(
+            config.https.unwrap_or(true),
+            &config.host,
+            port,
+            config.organization.as_deref().unwrap_or("proxmox"),
+            config.bucket.as_deref().unwrap_or("proxmox"),
+            config.token.as_deref(),
+            config.verify_tls.unwrap_or(true),
+            config.max_body_size.unwrap_or(25_000_000),
+        )?;
+        names.push(config.name);
+        futures.push(future);
+        channels.push(sender);
+    }
+    Ok((futures, channels, names))
+}
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (5 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
  8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

that way we can reuse the stats gathered

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs | 213 +++++++++++++++++++++-----------
 1 file changed, 141 insertions(+), 72 deletions(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index fa79322d..2700fabf 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,8 +17,11 @@ use tokio_stream::wrappers::ReceiverStream;
 use serde_json::{json, Value};
 use http::{Method, HeaderMap};
 
-use proxmox_sys::linux::socket::set_tcp_keepalive;
-use proxmox_sys::fs::CreateOptions;
+use proxmox_sys::linux::{
+    procfs::{ProcFsStat, ProcFsMemInfo, ProcFsNetDev, Loadavg},
+    socket::set_tcp_keepalive
+};
+use proxmox_sys::fs::{CreateOptions, DiskUsage};
 use proxmox_lang::try_block;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
@@ -44,6 +47,7 @@ use proxmox_backup::{
             Job,
         },
     },
+    tools::disks::BlockDevStat,
 };
 
 use pbs_buildcfg::configdir;
@@ -931,9 +935,24 @@ async fn run_stat_generator() {
     loop {
         let delay_target = Instant::now() +  Duration::from_secs(10);
 
-        generate_host_stats().await;
+        let (hoststats, hostdisk, datastores) = match tokio::task::spawn_blocking(|| {
+            let hoststats = collect_host_stats_sync();
+            let (hostdisk, datastores) = collect_disk_stats_sync();
+            (Arc::new(hoststats), Arc::new(hostdisk), Arc::new(datastores))
+        }).await {
+            Ok(res) => res,
+            Err(err) => {
+                log::error!("collecting host stats paniced: {}", err);
+                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+                continue;
+            }
+        };
+
+        let rrd_future = tokio::task::spawn_blocking(move || {
+            rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
+            rrd_sync_journal();
+        });
 
-        rrd_sync_journal();
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
@@ -941,86 +960,147 @@ async fn run_stat_generator() {
 
 }
 
-async fn generate_host_stats() {
-    match tokio::task::spawn_blocking(generate_host_stats_sync).await {
-        Ok(()) => (),
-        Err(err) => log::error!("generate_host_stats paniced: {}", err),
-    }
+struct HostStats {
+    proc: Option<ProcFsStat>,
+    meminfo: Option<ProcFsMemInfo>,
+    net: Option<Vec<ProcFsNetDev>>,
+    load: Option<Loadavg>,
+}
+
+struct DiskStat {
+    name: String,
+    usage: Option<DiskUsage>,
+    dev: Option<BlockDevStat>,
 }
 
-fn generate_host_stats_sync() {
+fn collect_host_stats_sync() -> HostStats {
     use proxmox_sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
 
-    match read_proc_stat() {
-        Ok(stat) => {
-            rrd_update_gauge("host/cpu", stat.cpu);
-            rrd_update_gauge("host/iowait", stat.iowait_percent);
-        }
+    let proc = match read_proc_stat() {
+        Ok(stat) => Some(stat),
         Err(err) => {
             eprintln!("read_proc_stat failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_meminfo() {
-        Ok(meminfo) => {
-            rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-            rrd_update_gauge("host/memused", meminfo.memused as f64);
-            rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-            rrd_update_gauge("host/swapused", meminfo.swapused as f64);
-        }
+    let meminfo = match read_meminfo() {
+        Ok(stat) => Some(stat),
         Err(err) => {
             eprintln!("read_meminfo failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_proc_net_dev() {
-        Ok(netdev) => {
-            use pbs_config::network::is_physical_nic;
-            let mut netin = 0;
-            let mut netout = 0;
-            for item in netdev {
-                if !is_physical_nic(&item.device) { continue; }
-                netin += item.receive;
-                netout += item.send;
-            }
-            rrd_update_derive("host/netin", netin as f64);
-            rrd_update_derive("host/netout", netout as f64);
-        }
+    let net = match read_proc_net_dev() {
+        Ok(netdev) => Some(netdev),
         Err(err) => {
             eprintln!("read_prox_net_dev failed - {}", err);
+            None
         }
-    }
+    };
 
-    match read_loadavg() {
-        Ok(loadavg) => {
-            rrd_update_gauge("host/loadavg", loadavg.0 as f64);
-        }
+    let load = match read_loadavg() {
+        Ok(loadavg) => Some(loadavg),
         Err(err) => {
             eprintln!("read_loadavg failed - {}", err);
+            None
         }
+    };
+
+    HostStats {
+        proc,
+        meminfo,
+        net,
+        load,
     }
+}
 
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
     let disk_manager = DiskManage::new();
 
-    gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
 
+    let mut datastores = Vec::new();
     match pbs_config::datastore::config() {
         Ok((config, _)) => {
             let datastore_list: Vec<DataStoreConfig> =
                 config.convert_to_typed_array("datastore").unwrap_or_default();
 
             for config in datastore_list {
-
-                let rrd_prefix = format!("datastore/{}", config.name);
                 let path = std::path::Path::new(&config.path);
-                gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
+                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
             }
         }
         Err(err) => {
             eprintln!("read datastore config failed - {}", err);
         }
     }
+
+    (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+    if let Some(stat) = &host.proc {
+        rrd_update_gauge("host/cpu", stat.cpu);
+        rrd_update_gauge("host/iowait", stat.iowait_percent);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+        rrd_update_gauge("host/memused", meminfo.memused as f64);
+        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+            use pbs_config::network::is_physical_nic;
+            let mut netin = 0;
+            let mut netout = 0;
+            for item in netdev {
+                if !is_physical_nic(&item.device) { continue; }
+                netin += item.receive;
+                netout += item.send;
+            }
+            rrd_update_derive("host/netin", netin as f64);
+            rrd_update_derive("host/netout", netout as f64);
+    }
+
+    if let Some(loadavg) = &host.load {
+        rrd_update_gauge("host/loadavg", loadavg.0 as f64);
+    }
+
+    rrd_update_disk_stat(&hostdisk, "host");
+
+    for stat in datastores {
+        let rrd_prefix = format!("datastore/{}", stat.name);
+        rrd_update_disk_stat(&stat, &rrd_prefix);
+    }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+    if let Some(status) = &disk.usage {
+        let rrd_key = format!("{}/total", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.total as f64);
+        let rrd_key = format!("{}/used", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.used as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        let rrd_key = format!("{}/read_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        let rrd_key = format!("{}/read_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
+
+        let rrd_key = format!("{}/write_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        let rrd_key = format!("{}/write_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
+
+        let rrd_key = format!("{}/io_ticks", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
+    }
 }
 
 fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
@@ -1053,22 +1133,17 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
-
-    match proxmox_sys::fs::disk_usage(path) {
-        Ok(status) => {
-            let rrd_key = format!("{}/total", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.total as f64);
-            let rrd_key = format!("{}/used", rrd_prefix);
-            rrd_update_gauge(&rrd_key, status.used as f64);
-        }
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+    let usage = match proxmox_sys::fs::disk_usage(path) {
+        Ok(status) => Some(status),
         Err(err) => {
             eprintln!("read disk_usage on {:?} failed - {}", path, err);
+            None
         }
-    }
+    };
 
-    match disk_manager.find_mounted_device(path) {
-        Ok(None) => {},
+    let dev = match disk_manager.find_mounted_device(path) {
+        Ok(None) => None,
         Ok(Some((fs_type, device, source))) => {
             let mut device_stat = None;
             match fs_type.as_str() {
@@ -1090,24 +1165,18 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
                     }
                 }
             }
-            if let Some(stat) = device_stat {
-                let rrd_key = format!("{}/read_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.read_ios as f64);
-                let rrd_key = format!("{}/read_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
-
-                let rrd_key = format!("{}/write_ios", rrd_prefix);
-                rrd_update_derive(&rrd_key, stat.write_ios as f64);
-                let rrd_key = format!("{}/write_bytes", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
-
-                let rrd_key = format!("{}/io_ticks", rrd_prefix);
-                rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
-            }
+            device_stat
         }
         Err(err) => {
             eprintln!("find_mounted_device failed - {}", err);
+            None
         }
+    };
+
+    DiskStat {
+        name: name.to_string(),
+        usage,
+        dev,
     }
 }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (6 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-15  7:51   ` Wolfgang Bumiller
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
  8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

and keep the data as similar as possible to pve (tags/fields)

datastores get their own 'object' type and reside in the "blockstat"
measurement

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs | 139 +++++++++++++++++++++++++++++++-
 1 file changed, 138 insertions(+), 1 deletion(-)

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 2700fabf..fbb782dd 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -23,11 +23,13 @@ use proxmox_sys::linux::{
 };
 use proxmox_sys::fs::{CreateOptions, DiskUsage};
 use proxmox_lang::try_block;
+use proxmox_metrics::MetricsData;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
 use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
 use proxmox_sys::{task_log, task_warn};
 use proxmox_sys::logrotate::LogRotate;
 
+use pbs_config::metrics::get_metric_server_connections;
 use pbs_datastore::DataStore;
 
 use proxmox_rest_server::{
@@ -948,16 +950,131 @@ async fn run_stat_generator() {
             }
         };
 
+        let hoststats2 = hoststats.clone();
+        let hostdisk2 = hostdisk.clone();
+        let datastores2 = datastores.clone();
         let rrd_future = tokio::task::spawn_blocking(move || {
-            rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
+            rrd_update_host_stats_sync(&hoststats2, &hostdisk2, &datastores2);
             rrd_sync_journal();
         });
 
+        let metrics_future = send_data_to_metric_servers(hoststats, hostdisk, datastores);
+
+        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+        if let Err(err) = rrd_res {
+            log::error!("rrd update panicked: {}", err);
+        }
+        if let Err(err) = metrics_res {
+            log::error!("error during metrics sending: {}", err);
+        }
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
 
      }
+}
+
+async fn send_data_to_metric_servers(
+    host: Arc<HostStats>,
+    hostdisk: Arc<DiskStat>,
+    datastores: Arc<Vec<DiskStat>>,
+) -> Result<(), Error> {
+    let (config, _digest) = pbs_config::metrics::config()?;
+    let (futures, channels, names) = get_metric_server_connections(config)?;
+
+    if futures.is_empty() {
+        return Ok(());
+    }
+
+    let names2 = names.clone();
+    let sending_handle = tokio::spawn(async move {
+        for (i, res) in future::join_all(futures).await.into_iter().enumerate() {
+            if let Err(err) = res {
+                eprintln!("ERROR '{}': {}", names2[i], err);
+            }
+        }
+    });
+
+    let ctime = proxmox_time::epoch_i64();
+    let nodename = proxmox_sys::nodename();
+
+    let mut values = Vec::new();
+
+    let mut cpuvalue = json!({});
+    if let Some(stat) = &host.proc {
+        for (key, value) in serde_json::to_value(stat)?.as_object().unwrap().iter() {
+            cpuvalue[key.clone()] = value.clone();
+        }
+    }
+
+    if let Some(loadavg) = &host.load {
+        cpuvalue["avg1"] = Value::from(loadavg.0);
+        cpuvalue["avg5"] = Value::from(loadavg.1);
+        cpuvalue["avg15"] = Value::from(loadavg.2);
+    }
 
+    values.push(Arc::new(MetricsData::new(
+        "cpustat",
+        &[("object", "host"), ("host", nodename)],
+        ctime,
+        cpuvalue,
+    )?));
+
+    if let Some(stat) = &host.meminfo {
+        values.push(Arc::new(MetricsData::new(
+            "memory",
+            &[("object", "host"), ("host", nodename)],
+            ctime,
+            stat,
+        )?));
+    }
+
+    if let Some(netdev) = &host.net {
+        for item in netdev {
+            values.push(Arc::new(MetricsData::new(
+                "nics",
+                &[
+                    ("object", "host"),
+                    ("host", nodename),
+                    ("instance", &item.device),
+                ],
+                ctime,
+                item,
+            )?));
+        }
+    }
+
+    values.push(Arc::new(MetricsData::new(
+        "blockstat",
+        &[("object", "host"), ("host", nodename)],
+        ctime,
+        hostdisk.to_value(),
+    )?));
+
+    for datastore in datastores.iter() {
+        values.push(Arc::new(MetricsData::new(
+            "blockstat",
+            &[
+                ("object", "datastore"),
+                ("nodename", nodename),
+                ("datastore", &datastore.name),
+            ],
+            ctime,
+            datastore.to_value(),
+        )?));
+    }
+
+    let results = proxmox_metrics::send_data_to_channels(&values, &channels).await;
+    for (i, res) in results.into_iter().enumerate() {
+        if let Err(err) = res {
+            log::error!("error sending to {}: {}", names[i], err);
+        }
+    }
+
+    drop(channels);
+
+    sending_handle.await?;
+
+    Ok(())
 }
 
 struct HostStats {
@@ -973,6 +1090,26 @@ struct DiskStat {
     dev: Option<BlockDevStat>,
 }
 
+impl DiskStat {
+    fn to_value(&self) -> Value {
+        let mut value = json!({});
+        if let Some(usage) = &self.usage {
+            value["total"] = Value::from(usage.total);
+            value["used"] = Value::from(usage.used);
+            value["avail"] = Value::from(usage.available);
+        }
+
+        if let Some(dev) = &self.dev {
+            value["read_ios"] = Value::from(dev.read_ios);
+            value["read_bytes"] = Value::from(dev.read_sectors * 512);
+            value["write_ios"] = Value::from(dev.write_ios);
+            value["write_bytes"] = Value::from(dev.write_sectors * 512);
+            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+        }
+        value
+    }
+}
+
 fn collect_host_stats_sync() -> HostStats {
     use proxmox_sys::linux::procfs::{
         read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints
  2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
                   ` (7 preceding siblings ...)
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
  2021-12-15  7:39   ` Wolfgang Bumiller
  8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
  To: pbs-devel

but in contrast to pve, we split the api by type of the section config,
since we cannot handle multiple types in the updater

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/config/metricserver/influxdbhttp.rs | 272 +++++++++++++++++++
 src/api2/config/metricserver/influxdbudp.rs  | 242 +++++++++++++++++
 src/api2/config/metricserver/mod.rs          |  16 ++
 src/api2/config/mod.rs                       |   2 +
 4 files changed, 532 insertions(+)
 create mode 100644 src/api2/config/metricserver/influxdbhttp.rs
 create mode 100644 src/api2/config/metricserver/influxdbudp.rs
 create mode 100644 src/api2/config/metricserver/mod.rs

diff --git a/src/api2/config/metricserver/influxdbhttp.rs b/src/api2/config/metricserver/influxdbhttp.rs
new file mode 100644
index 00000000..0763c979
--- /dev/null
+++ b/src/api2/config/metricserver/influxdbhttp.rs
@@ -0,0 +1,272 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use serde::{Deserialize, Serialize};
+use hex::FromHex;
+
+use proxmox_router::{Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+    InfluxDbHttp, InfluxDbHttpUpdater,
+    PROXMOX_CONFIG_DIGEST_SCHEMA, METRIC_SERVER_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use pbs_config::metrics;
+
+#[api(
+    input: {
+        properties: {},
+    },
+    returns: {
+        description: "List of configured InfluxDB http metric servers.",
+        type: Array,
+        items: { type: InfluxDbHttp },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// List configured InfluxDB http metric servers.
+pub fn list_influxdb_http_servers(
+    _param: Value,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<InfluxDbHttp>, Error> {
+
+    let (config, digest) = metrics::config()?;
+
+    let mut list: Vec<InfluxDbHttp> = config.convert_to_typed_array("influxdb-http")?;
+
+    // don't return token via api
+    for item in list.iter_mut() {
+        item.token = None;
+    }
+
+    rpcenv["digest"] = hex::encode(&digest).into();
+
+    Ok(list)
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            config: {
+                type: InfluxDbHttp,
+                flatten: true,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Create a new InfluxDB http server configuration
+pub fn create_influxdb_http_server(config: InfluxDbHttp) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, _digest) = metrics::config()?;
+
+    metrics.set_data(&config.name, "influxdb-http", &config)?;
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Remove a InfluxDB http server configuration
+pub fn delete_influxdb_http_server(
+    name: String,
+    digest: Option<String>,
+    _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, expected_digest) = metrics::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = <[u8; 32]>::from_hex(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    if metrics.sections.remove(&name).is_none()  {
+        bail!("name '{}' does not exist.", name);
+    }
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+#[api(
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+        },
+    },
+    returns:  { type: InfluxDbHttp },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// Read the InfluxDB http server configuration
+pub fn read_influxdb_http_server(
+    name: String,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<InfluxDbHttp, Error> {
+
+    let (metrics, digest) = metrics::config()?;
+
+    let mut config: InfluxDbHttp = metrics.lookup("influxdb-http", &name)?;
+
+    config.token = None;
+
+    rpcenv["digest"] = hex::encode(&digest).into();
+
+    Ok(config)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all="kebab-case")]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+    /// Delete the port property.
+    port,
+    /// Delete the https property.
+    https,
+    /// Delete the token property.
+    token,
+    /// Delete the bucket property.
+    bucket,
+    /// Delete the organization property.
+    organization,
+    /// Delete the max_body_size property.
+    max_body_size,
+    /// Delete the verify_tls property.
+    verify_tls,
+    /// Delete the comment property.
+    comment,
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+            update: {
+                type: InfluxDbHttpUpdater,
+                flatten: true,
+            },
+            delete: {
+                description: "List of properties to delete.",
+                type: Array,
+                optional: true,
+                items: {
+                    type: DeletableProperty,
+                }
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    returns:  { type: InfluxDbHttp },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Update an InfluxDB http server configuration
+pub fn update_influxdb_http_server(
+    name: String,
+    update: InfluxDbHttpUpdater,
+    delete: Option<Vec<DeletableProperty>>,
+    digest: Option<String>,
+    _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, expected_digest) = metrics::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = <[u8; 32]>::from_hex(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    let mut config: InfluxDbHttp = metrics.lookup("influxdb-http", &name)?;
+
+    if let Some(delete) = delete {
+        for delete_prop in delete {
+            match delete_prop {
+                DeletableProperty::port => { config.port = None; },
+                DeletableProperty::https => { config.https = None; },
+                DeletableProperty::token => { config.token = None; },
+                DeletableProperty::bucket => { config.bucket = None; },
+                DeletableProperty::organization => { config.organization = None; },
+                DeletableProperty::max_body_size => { config.max_body_size = None; },
+                DeletableProperty::verify_tls => { config.verify_tls = None; },
+                DeletableProperty::comment => { config.comment = None; },
+            }
+        }
+    }
+
+    if let Some(comment) = update.comment {
+        let comment = comment.trim().to_string();
+        if comment.is_empty() {
+            config.comment = None;
+        } else {
+            config.comment = Some(comment);
+        }
+    }
+
+    if let Some(host) = update.host { config.host = host; }
+
+    if update.port.is_some() { config.port = update.port; }
+    if update.https.is_some() { config.https = update.https; }
+    if update.token.is_some() { config.token = update.token; }
+    if update.bucket.is_some() { config.bucket = update.bucket; }
+    if update.organization.is_some() { config.organization = update.organization; }
+    if update.max_body_size.is_some() { config.max_body_size = update.max_body_size; }
+    if update.verify_tls.is_some() { config.verify_tls = update.verify_tls; }
+
+    metrics.set_data(&name, "influxdb-http", &config)?;
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+const ITEM_ROUTER: Router = Router::new()
+    .get(&API_METHOD_READ_INFLUXDB_HTTP_SERVER)
+    .put(&API_METHOD_UPDATE_INFLUXDB_HTTP_SERVER)
+    .delete(&API_METHOD_DELETE_INFLUXDB_HTTP_SERVER);
+
+pub const ROUTER: Router = Router::new()
+    .get(&API_METHOD_LIST_INFLUXDB_HTTP_SERVERS)
+    .post(&API_METHOD_CREATE_INFLUXDB_HTTP_SERVER)
+    .match_all("name", &ITEM_ROUTER);
diff --git a/src/api2/config/metricserver/influxdbudp.rs b/src/api2/config/metricserver/influxdbudp.rs
new file mode 100644
index 00000000..0936f18b
--- /dev/null
+++ b/src/api2/config/metricserver/influxdbudp.rs
@@ -0,0 +1,242 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use serde::{Deserialize, Serialize};
+use hex::FromHex;
+
+use proxmox_router::{Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+    InfluxDbUdp, InfluxDbUdpUpdater,
+    PROXMOX_CONFIG_DIGEST_SCHEMA, METRIC_SERVER_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use pbs_config::metrics;
+
+#[api(
+    input: {
+        properties: {},
+    },
+    returns: {
+        description: "List of configured InfluxDB udp metric servers.",
+        type: Array,
+        items: { type: InfluxDbUdp },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// List configured InfluxDB udp metric servers.
+pub fn list_influxdb_udp_servers(
+    _param: Value,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<InfluxDbUdp>, Error> {
+
+    let (config, digest) = metrics::config()?;
+
+    let list = config.convert_to_typed_array("influxdb-udp")?;
+
+    rpcenv["digest"] = hex::encode(&digest).into();
+
+    Ok(list)
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            config: {
+                type: InfluxDbUdp,
+                flatten: true,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Create a new InfluxDB udp server configuration
+pub fn create_influxdb_udp_server(config: InfluxDbUdp) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, _digest) = metrics::config()?;
+
+    metrics.set_data(&config.name, "influxdb-udp", &config)?;
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Remove a InfluxDB udp server configuration
+pub fn delete_influxdb_udp_server(
+    name: String,
+    digest: Option<String>,
+    _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, expected_digest) = metrics::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = <[u8; 32]>::from_hex(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    if metrics.sections.remove(&name).is_none()  {
+        bail!("name '{}' does not exist.", name);
+    }
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+#[api(
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+        },
+    },
+    returns:  { type: InfluxDbUdp },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+    },
+)]
+/// Read the InfluxDB udp server configuration
+pub fn read_influxdb_udp_server(
+    name: String,
+    mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<InfluxDbUdp, Error> {
+
+    let (metrics, digest) = metrics::config()?;
+
+    let config = metrics.lookup("influxdb-udp", &name)?;
+
+    rpcenv["digest"] = hex::encode(&digest).into();
+
+    Ok(config)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all="kebab-case")]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+    /// Delete the mtu property.
+    mtu,
+    /// Delete the comment property.
+    comment,
+}
+
+#[api(
+    protected: true,
+    input: {
+        properties: {
+            name: {
+                schema: METRIC_SERVER_ID_SCHEMA,
+            },
+            update: {
+                type: InfluxDbUdpUpdater,
+                flatten: true,
+            },
+            delete: {
+                description: "List of properties to delete.",
+                type: Array,
+                optional: true,
+                items: {
+                    type: DeletableProperty,
+                }
+            },
+            digest: {
+                optional: true,
+                schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+            },
+        },
+    },
+    returns:  { type: InfluxDbUdp },
+    access: {
+        permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+    },
+)]
+/// Update an InfluxDB udp server configuration
+pub fn update_influxdb_udp_server(
+    name: String,
+    update: InfluxDbUdpUpdater,
+    delete: Option<Vec<DeletableProperty>>,
+    digest: Option<String>,
+    _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+    let _lock = metrics::lock_config()?;
+
+    let (mut metrics, expected_digest) = metrics::config()?;
+
+    if let Some(ref digest) = digest {
+        let digest = <[u8; 32]>::from_hex(digest)?;
+        crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+    }
+
+    let mut config: InfluxDbUdp = metrics.lookup("influxdb-udp", &name)?;
+
+    if let Some(delete) = delete {
+        for delete_prop in delete {
+            match delete_prop {
+                DeletableProperty::mtu => { config.mtu = None; },
+                DeletableProperty::comment => { config.comment = None; },
+            }
+        }
+    }
+
+    if let Some(comment) = update.comment {
+        let comment = comment.trim().to_string();
+        if comment.is_empty() {
+            config.comment = None;
+        } else {
+            config.comment = Some(comment);
+        }
+    }
+
+    if let Some(host) = update.host { config.host = host; }
+    if let Some(port) = update.port { config.port = port; }
+
+    if update.mtu.is_some() { config.mtu = update.mtu; }
+
+    metrics.set_data(&name, "influxdb-udp", &config)?;
+
+    metrics::save_config(&metrics)?;
+
+    Ok(())
+}
+
+const ITEM_ROUTER: Router = Router::new()
+    .get(&API_METHOD_READ_INFLUXDB_UDP_SERVER)
+    .put(&API_METHOD_UPDATE_INFLUXDB_UDP_SERVER)
+    .delete(&API_METHOD_DELETE_INFLUXDB_UDP_SERVER);
+
+pub const ROUTER: Router = Router::new()
+    .get(&API_METHOD_LIST_INFLUXDB_UDP_SERVERS)
+    .post(&API_METHOD_CREATE_INFLUXDB_UDP_SERVER)
+    .match_all("name", &ITEM_ROUTER);
diff --git a/src/api2/config/metricserver/mod.rs b/src/api2/config/metricserver/mod.rs
new file mode 100644
index 00000000..cbce34f7
--- /dev/null
+++ b/src/api2/config/metricserver/mod.rs
@@ -0,0 +1,16 @@
+use proxmox_router::{Router, SubdirMap};
+use proxmox_router::list_subdirs_api_method;
+use proxmox_sys::sortable;
+
+pub mod influxdbudp;
+pub mod influxdbhttp;
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+    ("influxdb-http", &influxdbhttp::ROUTER),
+    ("influxdb-udp", &influxdbudp::ROUTER),
+]);
+
+pub const ROUTER: Router = Router::new()
+    .get(&list_subdirs_api_method!(SUBDIRS))
+    .subdirs(SUBDIRS);
diff --git a/src/api2/config/mod.rs b/src/api2/config/mod.rs
index c256ba64..5de1c28f 100644
--- a/src/api2/config/mod.rs
+++ b/src/api2/config/mod.rs
@@ -12,6 +12,7 @@ pub mod verify;
 pub mod drive;
 pub mod changer;
 pub mod media_pool;
+pub mod metricserver;
 pub mod tape_encryption_keys;
 pub mod tape_backup_job;
 pub mod traffic_control;
@@ -23,6 +24,7 @@ const SUBDIRS: SubdirMap = &[
     ("datastore", &datastore::ROUTER),
     ("drive", &drive::ROUTER),
     ("media-pool", &media_pool::ROUTER),
+    ("metricserver", &metricserver::ROUTER),
     ("remote", &remote::ROUTER),
     ("sync", &sync::ROUTER),
     ("tape-backup-job", &tape_backup_job::ROUTER),
-- 
2.30.2





^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
@ 2021-12-14 13:51   ` Wolfgang Bumiller
  0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-14 13:51 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

On Tue, Dec 14, 2021 at 01:24:06PM +0100, Dominik Csapak wrote:
> influxdb (udp + http(s)) only for now
> 
> general architecture looks as follows:
> 
> "new" returns a MetricsChannel and a Future
> the channels can be used to push data in (it flushes automatically if
> it would be over the configured size (mtu/max_body_size))
> 
> and the future must be polled to actually send data to the servers.
> 
> so most often it would look like this:
>   let (future, channel) = InfluxDbHttp::new(..params..)?;
>   let handle = tokio::spawn(future);
>   channel.send_data(...).await?;
>   handle.await?;
> 
> when all channels go out of scope, all remaining data in the channel
> will be read and sent to the server
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  Cargo.toml                            |   1 +
>  proxmox-metrics/Cargo.toml            |  20 ++++
>  proxmox-metrics/debian/changelog      |   5 +
>  proxmox-metrics/debian/copyright      |  16 +++
>  proxmox-metrics/debian/debcargo.toml  |   7 ++
>  proxmox-metrics/src/influxdb/http.rs  | 143 ++++++++++++++++++++++++++
>  proxmox-metrics/src/influxdb/mod.rs   |   7 ++
>  proxmox-metrics/src/influxdb/udp.rs   | 107 +++++++++++++++++++
>  proxmox-metrics/src/influxdb/utils.rs |  51 +++++++++
>  proxmox-metrics/src/lib.rs            |  92 +++++++++++++++++
>  10 files changed, 449 insertions(+)
>  create mode 100644 proxmox-metrics/Cargo.toml
>  create mode 100644 proxmox-metrics/debian/changelog
>  create mode 100644 proxmox-metrics/debian/copyright
>  create mode 100644 proxmox-metrics/debian/debcargo.toml
>  create mode 100644 proxmox-metrics/src/influxdb/http.rs
>  create mode 100644 proxmox-metrics/src/influxdb/mod.rs
>  create mode 100644 proxmox-metrics/src/influxdb/udp.rs
>  create mode 100644 proxmox-metrics/src/influxdb/utils.rs
>  create mode 100644 proxmox-metrics/src/lib.rs
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 8f85e08..4a458d2 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -6,6 +6,7 @@ members = [
>      "proxmox-http",
>      "proxmox-io",
>      "proxmox-lang",
> +    "proxmox-metrics",
>      "proxmox-router",
>      "proxmox-schema",
>      "proxmox-serde",
> diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
> new file mode 100644
> index 0000000..9ac50fe
> --- /dev/null
> +++ b/proxmox-metrics/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-metrics"
> +version = "0.1.0"
> +authors = ["Proxmox Support Team <support@proxmox.com>"]
> +edition = "2018"
> +license = "AGPL-3"
> +description = "Metrics Server export utilitites"
> +
> +exclude = [ "debian" ]
> +
> +[dependencies]
> +anyhow = "1.0"
> +tokio = { version = "1.0", features = [ "net", "sync" ] }
> +futures = "0.3"
> +serde = "1.0"
> +serde_json = "1.0"
> +http = "0.2"
> +hyper = "0.14"
> +openssl = "0.10"

Please sort the above, and separate the line below from the above group
with a newline

> +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
> diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
> new file mode 100644
> index 0000000..c02803b
> --- /dev/null
> +++ b/proxmox-metrics/debian/changelog
> @@ -0,0 +1,5 @@
> +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
> +
> +  * initial package
> +
> + -- Proxmox Support Team <support@proxmox.com>  Tue, 14 Dec 2021 08:56:54 +0100
> diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
> new file mode 100644
> index 0000000..5661ef6
> --- /dev/null
> +++ b/proxmox-metrics/debian/copyright
> @@ -0,0 +1,16 @@
> +Copyright (C) 2021 Proxmox Server Solutions GmbH
> +
> +This software is written by Proxmox Server Solutions GmbH <support@proxmox.com>
> +
> +This program is free software: you can redistribute it and/or modify
> +it under the terms of the GNU Affero General Public License as published by
> +the Free Software Foundation, either version 3 of the License, or
> +(at your option) any later version.
> +
> +This program is distributed in the hope that it will be useful,
> +but WITHOUT ANY WARRANTY; without even the implied warranty of
> +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +GNU Affero General Public License for more details.
> +
> +You should have received a copy of the GNU Affero General Public License
> +along with this program.  If not, see <http://www.gnu.org/licenses/>.
> diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
> new file mode 100644
> index 0000000..b7864cd
> --- /dev/null
> +++ b/proxmox-metrics/debian/debcargo.toml
> @@ -0,0 +1,7 @@
> +overlay = "."
> +crate_src_path = ".."
> +maintainer = "Proxmox Support Team <support@proxmox.com>"
> +
> +[source]
> +vcs_git = "git://git.proxmox.com/git/proxmox.git"
> +vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
> diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
> new file mode 100644
> index 0000000..8f1157d
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/http.rs
> @@ -0,0 +1,143 @@
> +use std::sync::Arc;
> +
> +use anyhow::{bail, Error};
> +use futures::{future::FutureExt, select};
> +use hyper::Body;
> +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
> +use tokio::sync::mpsc;
> +
> +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbHttp {
> +    client: SimpleHttp,
> +    _healthuri: http::Uri,
> +    writeuri: http::Uri,
> +    token: Option<String>,
> +    max_body_size: usize,
> +    data: String,
> +    data_channel: mpsc::Receiver<Arc<MetricsData>>,
> +    flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbHttp {
> +    pub fn new(
> +        https: bool,
> +        host: &str,
> +        port: u16,
> +        organization: &str,
> +        bucket: &str,
> +        token: Option<&str>,
> +        verify_tls: bool,
> +        max_body_size: usize,
> +    ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
> +        let (data_tx, data_rx) = mpsc::channel(1024);
> +        let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> +        let client = if verify_tls {
> +            SimpleHttp::with_options(SimpleHttpOptions::default())
> +        } else {
> +            let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
> +            ssl_connector.set_verify(SslVerifyMode::NONE);
> +            SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
> +        };
> +
> +        let authority = proxmox_http::uri::build_authority(host, port)?;
> +
> +        let writeuri = http::uri::Builder::new()
> +            .scheme(if https { "https" } else { "http" })
> +            .authority(authority.clone())
> +            .path_and_query(format!(
> +                "/api/v2/write?org={}&bucket={}",
> +                organization, bucket
> +            ))
> +            .build()?;
> +
> +        let healthuri = http::uri::Builder::new()
> +            .scheme(if https { "https" } else { "http" })
> +            .authority(authority)
> +            .path_and_query("/health")
> +            .build()?;
> +
> +        let this = Self {
> +            client,
> +            writeuri,
> +            _healthuri: healthuri,
> +            token: token.map(String::from),
> +            max_body_size,
> +            data: String::new(),
> +            data_channel: data_rx,
> +            flush_channel: flush_rx,
> +        };
> +
> +        let future = Box::pin(this.finish());
> +        let channel = MetricsChannel {
> +            data_channel: data_tx,
> +            flush_channel: flush_tx,
> +        };
> +        Ok((future, channel))
> +    }
> +
> +    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        let new_data = utils::format_influxdb_line(&data)?;
> +
> +        if self.data.len() + new_data.len() >= self.max_body_size {
> +            self.flush().await?;
> +        }
> +
> +        self.data.push_str(&new_data);
> +
> +        if self.data.len() >= self.max_body_size {
> +            self.flush().await?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    pub async fn flush(&mut self) -> Result<(), Error> {
> +        if self.data.is_empty() {
> +            return Ok(());
> +        }
> +        let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
> +
> +        if let Some(token) = &self.token {
> +            request = request.header("Authorization", format!("Token {}", token));
> +        }
> +
> +        let request = request.body(Body::from(self.data.split_off(0)))?;
> +
> +        let res = self.client.request(request).await?;
> +
> +        let status = res.status();
> +        if !status.is_success() {
> +            bail!("got bad status: {}", status);
> +        }
> +        Ok(())
> +    }
> +
> +    async fn finish(mut self) -> Result<(), Error> {
> +        loop {
> +            select! {

I wonder, don't you want to receive data & flushes in some kind of
order?
Wouldn't a single channel over an
`enum MetricsValue { Flush, Data(MetricsData) }`
make more sense?

> +                res = self.flush_channel.recv().fuse() => match res {
> +                    Some(_) => self.flush().await?,
> +                    None => break, // all senders gone
> +                },
> +                data = self.data_channel.recv().fuse() => match data {
> +                    Some(data) => self.add_data(data).await?,
> +                    None => break, // all senders gone
> +                },
> +            }
> +        }
> +
> +        // consume remaining data in channel
> +        while let Some(data) = self.data_channel.recv().await {
> +            self.add_data(data).await?;
> +        }
> +
> +        self.flush().await?;
> +
> +        Ok(())
> +    }
> +}
> diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
> new file mode 100644
> index 0000000..26a715c
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/mod.rs
> @@ -0,0 +1,7 @@
> +mod http;
> +pub use self::http::*;
> +
> +mod udp;
> +pub use udp::*;
> +
> +pub mod utils;
> diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
> new file mode 100644
> index 0000000..de2b0d5
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/udp.rs
> @@ -0,0 +1,107 @@
> +use std::sync::Arc;
> +
> +use anyhow::Error;
> +use futures::{future::FutureExt, select};
> +use tokio::net::UdpSocket;
> +use tokio::sync::mpsc;
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbUdp {
> +    address: String,
> +    conn: Option<tokio::net::UdpSocket>,
> +    mtu: u16,
> +    data: String,
> +    data_channel: mpsc::Receiver<Arc<MetricsData>>,
> +    flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbUdp {
> +    pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
> +        let (data_tx, data_rx) = mpsc::channel(1024);
> +        let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> +        let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" {
> +            format!("[{}]:{}", host, port)

Here you handle IPv6 but...

> +        } else {
> +            format!("{}:{}", host, port)
> +        };
> +
> +        let this = Self {
> +            address,
> +            conn: None,
> +            mtu: mtu.unwrap_or(1500),
> +            data: String::new(),
> +            data_channel: data_rx,
> +            flush_channel: flush_rx,
> +        };
> +
> +        let future = Box::pin(this.finish());
> +
> +        let channel = MetricsChannel {
> +            data_channel: data_tx,
> +            flush_channel: flush_tx,
> +        };
> +
> +        (future, channel)
> +    }
> +
> +    async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
> +        let conn = UdpSocket::bind("0.0.0.0:0").await?;

...here you're specifically binding to an IPv4 which will cause rust to
issue a `socket(AF_INET, ...)` syscall rather than `socket(AF_INET6,
...)` for IPv6.

> +        let addr = self.address.clone();
> +        conn.connect(addr).await?;
> +        Ok(conn)
> +    }
> +
> +    async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        let new_data = utils::format_influxdb_line(&data)?;
> +
> +        if self.data.len() + new_data.len() >= (self.mtu as usize) {
> +            self.flush().await?;
> +        }
> +
> +        self.data.push_str(&new_data);

Is it possible for `new_data.len()` to be larger than the mtu? if so,
should this warn or something?

Otherwise the next flush below might become a problem?

> +
> +        if self.data.len() >= (self.mtu as usize) {
> +            self.flush().await?;
> +        }
> +
> +        Ok(())
> +    }
> +
> +    async fn flush(&mut self) -> Result<(), Error> {
> +        let conn = match self.conn.take() {
> +            Some(conn) => conn,
> +            None => self.connect().await?,
> +        };
> +
> +        conn.send(self.data.split_off(0).as_bytes()).await?;
> +        self.conn = Some(conn);
> +        Ok(())
> +    }
> +
> +    async fn finish(mut self) -> Result<(), Error> {
> +        loop {
> +            select! {
> +                res = self.flush_channel.recv().fuse() => match res {
> +                    Some(_) => self.flush().await?,
> +                    None => break, // all senders gone
> +                },
> +                data = self.data_channel.recv().fuse() => match data {
> +                    Some(data) => self.add_data(data).await?,
> +                    None => break, // all senders gone
> +                },
> +            }
> +        }
> +
> +        // consume remaining data in channel
> +        while let Some(data) = self.data_channel.recv().await {
> +            self.add_data(data).await?;
> +        }
> +
> +        self.flush().await?;
> +
> +        Ok(())
> +    }
> +}
> diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
> new file mode 100644
> index 0000000..bf391f9
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/utils.rs
> @@ -0,0 +1,51 @@
> +use anyhow::{bail, Error};
> +
> +use crate::MetricsData;
> +
> +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
> +    if !data.values.is_object() {
> +        bail!("invalid data");
> +    }
> +
> +    let mut line = escape_measurement(&data.measurement);
> +    line.push(',');
> +
> +    let tags = data.tags.iter().map(|(key, value)| {
> +        format!("{}={}", escape_key(&key), escape_key(&value))
> +    });
> +    line.push_str(&tags.collect::<Vec<String>>().join(","));

I'm not too fond of the temporary `Vec` here and below, maybe use
`line.extend()` with the ',' as part of the format string (",{}={}") or
skip even the temporary format and just

    for (key, value) in &data.tags {
        line.push(',')
        line.push_str(escape_key(&key))
        line.push('=')
        line.push_str(value)
    }

it's not really longer... alternatively, more readable and without the
temporary `String` would be `write!(line, ",{}={}", ...)?` etc.

> +
> +    line.push(' ');
> +
> +    let values = data.values.as_object().unwrap().iter().map(|(key, value)| {
> +        let value = if value.is_string() {
> +             escape_value(&value.to_string())

^ extra space? :P

> +        } else {
> +            value.to_string()
> +        };
> +        format!("{}={}", escape_key(&key), value)
> +    });
> +
> +    line.push_str(&values.collect::<Vec<String>>().join(","));
> +
> +    // nanosecond precision
> +    line.push_str(&format!(" {}\n", data.ctime*1_000_000_000));
> +    Ok(line)
> +}
> +
> +fn escape_key(key: &str) -> String {
> +    let key = key.replace(',', "\\,");
> +    let key = key.replace('=', "\\=");
> +    let key = key.replace(' ', "\\ ");
> +    key
> +}
> +
> +fn escape_measurement(measurement: &str) -> String {
> +    let measurement = measurement.replace(',', "\\,");
> +    let measurement = measurement.replace(' ', "\\ ");
> +    measurement
> +}
> +
> +fn escape_value(value: &str) -> String {
> +    format!("\"{}\"",value.replace('"', "\\\""))
> +}
> diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
> new file mode 100644
> index 0000000..0a76faa
> --- /dev/null
> +++ b/proxmox-metrics/src/lib.rs
> @@ -0,0 +1,92 @@
> +use std::collections::HashMap;
> +use std::pin::Pin;
> +use std::sync::Arc;
> +
> +use anyhow::{bail, format_err, Error};
> +use serde::Serialize;
> +use serde_json::Value;
> +use tokio::sync::mpsc;
> +
> +pub mod influxdb;
> +
> +#[derive(Clone)]
> +/// Structured data for the metric server
> +pub struct MetricsData {
> +    /// The category of measurements
> +    pub measurement: String,
> +    /// A list of to attach to the measurements
> +    pub tags: HashMap<String, String>,
> +    /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
> +    pub values: Value,
> +    /// The time of the measurement
> +    pub ctime: i64,
> +}
> +
> +impl MetricsData {
> +    /// Convenient helper to create from references
> +    pub fn new<V: Serialize>(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result<Self, Error> {
> +        let mut new_tags = HashMap::new();
> +        for (key, value) in tags {
> +            new_tags.insert(key.to_string(), value.to_string());
> +        }
> +
> +        Ok(Self{
> +            measurement: measurement.to_string(),
> +            tags: new_tags,
> +            values: serde_json::to_value(values)?,
> +            ctime,
> +        })
> +    }
> +}
> +
> +pub type MetricsServerFuture =
> +    Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
> +
> +#[derive(Clone)]
> +/// A channel to send data to the metric server
> +pub struct MetricsChannel {
> +    pub(crate) data_channel: mpsc::Sender<Arc<MetricsData>>,
> +    pub(crate) flush_channel: mpsc::Sender<()>,
> +}
> +
> +impl MetricsChannel {
> +    /// Queues the given data for the metric server. If the queue is full,
> +    /// flush and try again.
> +    pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
> +        if let Err(err) = self.data_channel.try_send(data) {
> +            match err {
> +                mpsc::error::TrySendError::Full(data) => {
> +                    self.flush_channel.send(()).await?;
> +                    self.data_channel
> +                        .send(data)
> +                        .await
> +                        .map_err(|_| format_err!("error sending data"))?;
> +                }
> +                mpsc::error::TrySendError::Closed(_) => {
> +                    bail!("channel closed");
> +                }
> +            }
> +        }
> +        Ok(())
> +    }
> +
> +    /// Flush data to the metric server
> +    pub async fn flush(&self) -> Result<(), Error> {
> +        self.flush_channel.send(()).await?;
> +        Ok(())
> +    }
> +}
> +
> +pub async fn send_data_to_channels(values: &[Arc<MetricsData>], channels: &[MetricsChannel]) -> Vec<Result<(), Error>> {
> +    let mut futures = Vec::with_capacity(channels.len());
> +    for channel in channels {
> +        futures.push(async move {
> +            for data in values.into_iter() {

`.into_iter()` shouldn't be necessary, that's how `for` loops are
defined after all.

> +                channel.send_data(data.clone()).await?
> +            }
> +            Ok::<(), Error>(())
> +        });
> +    }
> +
> +    futures::future::join_all(futures).await
> +}
> -- 
> 2.30.2




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
@ 2021-12-15  7:39   ` Wolfgang Bumiller
  0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-15  7:39 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

On Tue, Dec 14, 2021 at 01:24:12PM +0100, Dominik Csapak wrote:
> but in contrast to pve, we split the api by type of the section config,
> since we cannot handle multiple types in the updater
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> +
> +#[api()]
> +#[derive(Serialize, Deserialize)]
> +#[serde(rename_all="kebab-case")]
> +#[allow(non_camel_case_types)]
           ^~~~~~~~~~~~~~~~~~~^

Please don't add more of this nonsense!

> +/// Deletable property name
> +pub enum DeletableProperty {
> +    /// Delete the port property.
> +    port,
> +    /// Delete the https property.
> +    https,
> +    /// Delete the token property.
> +    token,
> +    /// Delete the bucket property.
> +    bucket,
> +    /// Delete the organization property.
> +    organization,
> +    /// Delete the max_body_size property.
> +    max_body_size,
> +    /// Delete the verify_tls property.
> +    verify_tls,
> +    /// Delete the comment property.
> +    comment,
> +}


> +#[api()]
> +#[derive(Serialize, Deserialize)]
> +#[serde(rename_all="kebab-case")]
> +#[allow(non_camel_case_types)]

No

> +/// Deletable property name
> +pub enum DeletableProperty {
> +    /// Delete the mtu property.
> +    mtu,
> +    /// Delete the comment property.
> +    comment,
> +}
> +




^ permalink raw reply	[flat|nested] 13+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server
  2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
@ 2021-12-15  7:51   ` Wolfgang Bumiller
  0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-15  7:51 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

On Tue, Dec 14, 2021 at 01:24:11PM +0100, Dominik Csapak wrote:
> and keep the data as similar as possible to pve (tags/fields)
> 
> datastores get their own 'object' type and reside in the "blockstat"
> measurement
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  src/bin/proxmox-backup-proxy.rs | 139 +++++++++++++++++++++++++++++++-
>  1 file changed, 138 insertions(+), 1 deletion(-)
> 
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 2700fabf..fbb782dd 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -23,11 +23,13 @@ use proxmox_sys::linux::{
>  };
>  use proxmox_sys::fs::{CreateOptions, DiskUsage};
>  use proxmox_lang::try_block;
> +use proxmox_metrics::MetricsData;
>  use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
>  use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
>  use proxmox_sys::{task_log, task_warn};
>  use proxmox_sys::logrotate::LogRotate;
>  
> +use pbs_config::metrics::get_metric_server_connections;
>  use pbs_datastore::DataStore;
>  
>  use proxmox_rest_server::{
> @@ -948,16 +950,131 @@ async fn run_stat_generator() {
>              }
>          };
>  
> +        let hoststats2 = hoststats.clone();
> +        let hostdisk2 = hostdisk.clone();
> +        let datastores2 = datastores.clone();

Please use Arc::clone, also, I'm not sure it's worth having them all as
separate Arcs, maybe just a 3-tuple in 1 Arc?

>          let rrd_future = tokio::task::spawn_blocking(move || {
> -            rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
> +            rrd_update_host_stats_sync(&hoststats2, &hostdisk2, &datastores2);
>              rrd_sync_journal();
>          });
>  
> +        let metrics_future = send_data_to_metric_servers(hoststats, hostdisk, datastores);
> +
> +        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
> +        if let Err(err) = rrd_res {
> +            log::error!("rrd update panicked: {}", err);
> +        }
> +        if let Err(err) = metrics_res {
> +            log::error!("error during metrics sending: {}", err);
> +        }
>  
>          tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
>  
>       }
> +}
> +
> +async fn send_data_to_metric_servers(
> +    host: Arc<HostStats>,
> +    hostdisk: Arc<DiskStat>,
> +    datastores: Arc<Vec<DiskStat>>,
> +) -> Result<(), Error> {
> +    let (config, _digest) = pbs_config::metrics::config()?;
> +    let (futures, channels, names) = get_metric_server_connections(config)?;
> +
> +    if futures.is_empty() {
> +        return Ok(());
> +    }
> +
> +    let names2 = names.clone();
> +    let sending_handle = tokio::spawn(async move {
> +        for (i, res) in future::join_all(futures).await.into_iter().enumerate() {
> +            if let Err(err) = res {
> +                eprintln!("ERROR '{}': {}", names2[i], err);
> +            }
> +        }
> +    });
> +
> +    let ctime = proxmox_time::epoch_i64();
> +    let nodename = proxmox_sys::nodename();
> +
> +    let mut values = Vec::new();
> +
> +    let mut cpuvalue = json!({});
> +    if let Some(stat) = &host.proc {
> +        for (key, value) in serde_json::to_value(stat)?.as_object().unwrap().iter() {
> +            cpuvalue[key.clone()] = value.clone();
> +        }
I may be missing something but can I not replace the entire loop with
just:
    cpuvalue = to_value(stat);
?

> +    }

in fact:

    let mut cpuvalue = match &host.proc {
        Some(stat) => serde_json.to_value(stat),
        None => json!({}),
    };

> +
> +    if let Some(loadavg) = &host.load {
> +        cpuvalue["avg1"] = Value::from(loadavg.0);
> +        cpuvalue["avg5"] = Value::from(loadavg.1);
> +        cpuvalue["avg15"] = Value::from(loadavg.2);
> +    }

> @@ -973,6 +1090,26 @@ struct DiskStat {
>      dev: Option<BlockDevStat>,
>  }
>  
> +impl DiskStat {
> +    fn to_value(&self) -> Value {
> +        let mut value = json!({});
> +        if let Some(usage) = &self.usage {
> +            value["total"] = Value::from(usage.total);
> +            value["used"] = Value::from(usage.used);
> +            value["avail"] = Value::from(usage.available);
> +        }
> +
> +        if let Some(dev) = &self.dev {
> +            value["read_ios"] = Value::from(dev.read_ios);
> +            value["read_bytes"] = Value::from(dev.read_sectors * 512);

And bye-bye goes the hope for a generic
'serialize-by-merging-into-existing-object' helper :-(




^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2021-12-15  7:51 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
2021-12-14 13:51   ` Wolfgang Bumiller
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
2021-12-15  7:51   ` Wolfgang Bumiller
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
2021-12-15  7:39   ` Wolfgang Bumiller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal