* [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper Dominik Csapak
` (7 subsequent siblings)
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
we already depend on serde anyway, and this makes gathering structs a
bit more comfortable
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-sys/Cargo.toml | 1 +
proxmox-sys/src/linux/procfs/mod.rs | 7 ++++---
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/proxmox-sys/Cargo.toml b/proxmox-sys/Cargo.toml
index 80d39eb..bfcd388 100644
--- a/proxmox-sys/Cargo.toml
+++ b/proxmox-sys/Cargo.toml
@@ -17,6 +17,7 @@ log = "0.4"
nix = "0.19.1"
regex = "1.2"
serde_json = "1.0"
+serde = { version = "1.0", features = [ "derive" ] }
zstd = { version = "0.6", features = [ "bindgen" ] }
# Macro crates:
diff --git a/proxmox-sys/src/linux/procfs/mod.rs b/proxmox-sys/src/linux/procfs/mod.rs
index 30b9978..3373dec 100644
--- a/proxmox-sys/src/linux/procfs/mod.rs
+++ b/proxmox-sys/src/linux/procfs/mod.rs
@@ -11,6 +11,7 @@ use std::time::Instant;
use anyhow::{bail, format_err, Error};
use lazy_static::lazy_static;
use nix::unistd::Pid;
+use serde::Serialize;
use crate::fs::file_read_firstline;
@@ -184,7 +185,7 @@ pub fn read_proc_uptime_ticks() -> Result<(u64, u64), Error> {
Ok((up as u64, idle as u64))
}
-#[derive(Debug, Default)]
+#[derive(Debug, Default, Serialize)]
/// The CPU fields from `/proc/stat` with their native time value. Multiply
/// with CLOCK_TICKS to get the real value.
pub struct ProcFsStat {
@@ -407,7 +408,7 @@ fn test_read_proc_stat() {
assert_eq!(stat.iowait_percent, 0.0);
}
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
pub struct ProcFsMemInfo {
pub memtotal: u64,
pub memfree: u64,
@@ -539,7 +540,7 @@ pub fn read_memory_usage() -> Result<ProcFsMemUsage, Error> {
}
}
-#[derive(Debug)]
+#[derive(Debug, Serialize)]
pub struct ProcFsNetDev {
pub device: String,
pub receive: u64,
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
` (6 subsequent siblings)
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
copied from proxmox-backup
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox-sys/src/fs/mod.rs | 26 ++++++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/proxmox-sys/src/fs/mod.rs b/proxmox-sys/src/fs/mod.rs
index 9fe333b..5935b1a 100644
--- a/proxmox-sys/src/fs/mod.rs
+++ b/proxmox-sys/src/fs/mod.rs
@@ -3,6 +3,7 @@ use std::fs::File;
use std::path::Path;
use anyhow::{bail, Error};
+use serde::Serialize;
use std::os::unix::io::{AsRawFd, RawFd};
use nix::unistd::{Gid, Uid};
@@ -102,3 +103,28 @@ impl CreateOptions {
*/
}
+/// Basic disk usage information
+#[derive(Serialize)]
+pub struct DiskUsage {
+ pub total: u64,
+ pub used: u64,
+ pub available: u64,
+}
+
+/// Get disk usage information from path
+pub fn disk_usage(path: &std::path::Path) -> Result<DiskUsage, Error> {
+ let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() };
+
+ use nix::NixPath;
+
+ let res = path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?;
+ nix::errno::Errno::result(res)?;
+
+ let bsize = stat.f_bsize as u64;
+
+ Ok(DiskUsage{
+ total: stat.f_blocks*bsize,
+ used: (stat.f_blocks-stat.f_bfree)*bsize,
+ available: stat.f_bavail*bsize,
+ })
+}
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 1/3] proxmox-sys: make some structs serializable Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 2/3] proxmox-sys: add DiskUsage struct and helper Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 13:51 ` Wolfgang Bumiller
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys Dominik Csapak
` (5 subsequent siblings)
8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
influxdb (udp + http(s)) only for now
general architecture looks as follows:
"new" returns a MetricsChannel and a Future
the channels can be used to push data in (it flushes automatically if
it would be over the configured size (mtu/max_body_size))
and the future must be polled to actually send data to the servers.
so most often it would look like this:
let (future, channel) = InfluxDbHttp::new(..params..)?;
let handle = tokio::spawn(future);
channel.send_data(...).await?;
handle.await?;
when all channels go out of scope, all remaining data in the channel
will be read and sent to the server
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Cargo.toml | 1 +
proxmox-metrics/Cargo.toml | 20 ++++
proxmox-metrics/debian/changelog | 5 +
proxmox-metrics/debian/copyright | 16 +++
proxmox-metrics/debian/debcargo.toml | 7 ++
proxmox-metrics/src/influxdb/http.rs | 143 ++++++++++++++++++++++++++
proxmox-metrics/src/influxdb/mod.rs | 7 ++
proxmox-metrics/src/influxdb/udp.rs | 107 +++++++++++++++++++
proxmox-metrics/src/influxdb/utils.rs | 51 +++++++++
proxmox-metrics/src/lib.rs | 92 +++++++++++++++++
10 files changed, 449 insertions(+)
create mode 100644 proxmox-metrics/Cargo.toml
create mode 100644 proxmox-metrics/debian/changelog
create mode 100644 proxmox-metrics/debian/copyright
create mode 100644 proxmox-metrics/debian/debcargo.toml
create mode 100644 proxmox-metrics/src/influxdb/http.rs
create mode 100644 proxmox-metrics/src/influxdb/mod.rs
create mode 100644 proxmox-metrics/src/influxdb/udp.rs
create mode 100644 proxmox-metrics/src/influxdb/utils.rs
create mode 100644 proxmox-metrics/src/lib.rs
diff --git a/Cargo.toml b/Cargo.toml
index 8f85e08..4a458d2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,6 +6,7 @@ members = [
"proxmox-http",
"proxmox-io",
"proxmox-lang",
+ "proxmox-metrics",
"proxmox-router",
"proxmox-schema",
"proxmox-serde",
diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
new file mode 100644
index 0000000..9ac50fe
--- /dev/null
+++ b/proxmox-metrics/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "proxmox-metrics"
+version = "0.1.0"
+authors = ["Proxmox Support Team <support@proxmox.com>"]
+edition = "2018"
+license = "AGPL-3"
+description = "Metrics Server export utilitites"
+
+exclude = [ "debian" ]
+
+[dependencies]
+anyhow = "1.0"
+tokio = { version = "1.0", features = [ "net", "sync" ] }
+futures = "0.3"
+serde = "1.0"
+serde_json = "1.0"
+http = "0.2"
+hyper = "0.14"
+openssl = "0.10"
+proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
new file mode 100644
index 0000000..c02803b
--- /dev/null
+++ b/proxmox-metrics/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
+
+ * initial package
+
+ -- Proxmox Support Team <support@proxmox.com> Tue, 14 Dec 2021 08:56:54 +0100
diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
new file mode 100644
index 0000000..5661ef6
--- /dev/null
+++ b/proxmox-metrics/debian/copyright
@@ -0,0 +1,16 @@
+Copyright (C) 2021 Proxmox Server Solutions GmbH
+
+This software is written by Proxmox Server Solutions GmbH <support@proxmox.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU Affero General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU Affero General Public License for more details.
+
+You should have received a copy of the GNU Affero General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
new file mode 100644
index 0000000..b7864cd
--- /dev/null
+++ b/proxmox-metrics/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
new file mode 100644
index 0000000..8f1157d
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/http.rs
@@ -0,0 +1,143 @@
+use std::sync::Arc;
+
+use anyhow::{bail, Error};
+use futures::{future::FutureExt, select};
+use hyper::Body;
+use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
+use tokio::sync::mpsc;
+
+use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbHttp {
+ client: SimpleHttp,
+ _healthuri: http::Uri,
+ writeuri: http::Uri,
+ token: Option<String>,
+ max_body_size: usize,
+ data: String,
+ data_channel: mpsc::Receiver<Arc<MetricsData>>,
+ flush_channel: mpsc::Receiver<()>,
+}
+
+impl InfluxDbHttp {
+ pub fn new(
+ https: bool,
+ host: &str,
+ port: u16,
+ organization: &str,
+ bucket: &str,
+ token: Option<&str>,
+ verify_tls: bool,
+ max_body_size: usize,
+ ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
+ let (data_tx, data_rx) = mpsc::channel(1024);
+ let (flush_tx, flush_rx) = mpsc::channel(1);
+
+ let client = if verify_tls {
+ SimpleHttp::with_options(SimpleHttpOptions::default())
+ } else {
+ let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
+ ssl_connector.set_verify(SslVerifyMode::NONE);
+ SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
+ };
+
+ let authority = proxmox_http::uri::build_authority(host, port)?;
+
+ let writeuri = http::uri::Builder::new()
+ .scheme(if https { "https" } else { "http" })
+ .authority(authority.clone())
+ .path_and_query(format!(
+ "/api/v2/write?org={}&bucket={}",
+ organization, bucket
+ ))
+ .build()?;
+
+ let healthuri = http::uri::Builder::new()
+ .scheme(if https { "https" } else { "http" })
+ .authority(authority)
+ .path_and_query("/health")
+ .build()?;
+
+ let this = Self {
+ client,
+ writeuri,
+ _healthuri: healthuri,
+ token: token.map(String::from),
+ max_body_size,
+ data: String::new(),
+ data_channel: data_rx,
+ flush_channel: flush_rx,
+ };
+
+ let future = Box::pin(this.finish());
+ let channel = MetricsChannel {
+ data_channel: data_tx,
+ flush_channel: flush_tx,
+ };
+ Ok((future, channel))
+ }
+
+ async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+ let new_data = utils::format_influxdb_line(&data)?;
+
+ if self.data.len() + new_data.len() >= self.max_body_size {
+ self.flush().await?;
+ }
+
+ self.data.push_str(&new_data);
+
+ if self.data.len() >= self.max_body_size {
+ self.flush().await?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn flush(&mut self) -> Result<(), Error> {
+ if self.data.is_empty() {
+ return Ok(());
+ }
+ let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
+
+ if let Some(token) = &self.token {
+ request = request.header("Authorization", format!("Token {}", token));
+ }
+
+ let request = request.body(Body::from(self.data.split_off(0)))?;
+
+ let res = self.client.request(request).await?;
+
+ let status = res.status();
+ if !status.is_success() {
+ bail!("got bad status: {}", status);
+ }
+ Ok(())
+ }
+
+ async fn finish(mut self) -> Result<(), Error> {
+ loop {
+ select! {
+ res = self.flush_channel.recv().fuse() => match res {
+ Some(_) => self.flush().await?,
+ None => break, // all senders gone
+ },
+ data = self.data_channel.recv().fuse() => match data {
+ Some(data) => self.add_data(data).await?,
+ None => break, // all senders gone
+ },
+ }
+ }
+
+ // consume remaining data in channel
+ while let Some(data) = self.data_channel.recv().await {
+ self.add_data(data).await?;
+ }
+
+ self.flush().await?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
new file mode 100644
index 0000000..26a715c
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/mod.rs
@@ -0,0 +1,7 @@
+mod http;
+pub use self::http::*;
+
+mod udp;
+pub use udp::*;
+
+pub mod utils;
diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
new file mode 100644
index 0000000..de2b0d5
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/udp.rs
@@ -0,0 +1,107 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use futures::{future::FutureExt, select};
+use tokio::net::UdpSocket;
+use tokio::sync::mpsc;
+
+use crate::influxdb::utils;
+use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
+
+pub struct InfluxDbUdp {
+ address: String,
+ conn: Option<tokio::net::UdpSocket>,
+ mtu: u16,
+ data: String,
+ data_channel: mpsc::Receiver<Arc<MetricsData>>,
+ flush_channel: mpsc::Receiver<()>,
+}
+
+impl InfluxDbUdp {
+ pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
+ let (data_tx, data_rx) = mpsc::channel(1024);
+ let (flush_tx, flush_rx) = mpsc::channel(1);
+
+ let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" {
+ format!("[{}]:{}", host, port)
+ } else {
+ format!("{}:{}", host, port)
+ };
+
+ let this = Self {
+ address,
+ conn: None,
+ mtu: mtu.unwrap_or(1500),
+ data: String::new(),
+ data_channel: data_rx,
+ flush_channel: flush_rx,
+ };
+
+ let future = Box::pin(this.finish());
+
+ let channel = MetricsChannel {
+ data_channel: data_tx,
+ flush_channel: flush_tx,
+ };
+
+ (future, channel)
+ }
+
+ async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
+ let conn = UdpSocket::bind("0.0.0.0:0").await?;
+ let addr = self.address.clone();
+ conn.connect(addr).await?;
+ Ok(conn)
+ }
+
+ async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
+ let new_data = utils::format_influxdb_line(&data)?;
+
+ if self.data.len() + new_data.len() >= (self.mtu as usize) {
+ self.flush().await?;
+ }
+
+ self.data.push_str(&new_data);
+
+ if self.data.len() >= (self.mtu as usize) {
+ self.flush().await?;
+ }
+
+ Ok(())
+ }
+
+ async fn flush(&mut self) -> Result<(), Error> {
+ let conn = match self.conn.take() {
+ Some(conn) => conn,
+ None => self.connect().await?,
+ };
+
+ conn.send(self.data.split_off(0).as_bytes()).await?;
+ self.conn = Some(conn);
+ Ok(())
+ }
+
+ async fn finish(mut self) -> Result<(), Error> {
+ loop {
+ select! {
+ res = self.flush_channel.recv().fuse() => match res {
+ Some(_) => self.flush().await?,
+ None => break, // all senders gone
+ },
+ data = self.data_channel.recv().fuse() => match data {
+ Some(data) => self.add_data(data).await?,
+ None => break, // all senders gone
+ },
+ }
+ }
+
+ // consume remaining data in channel
+ while let Some(data) = self.data_channel.recv().await {
+ self.add_data(data).await?;
+ }
+
+ self.flush().await?;
+
+ Ok(())
+ }
+}
diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
new file mode 100644
index 0000000..bf391f9
--- /dev/null
+++ b/proxmox-metrics/src/influxdb/utils.rs
@@ -0,0 +1,51 @@
+use anyhow::{bail, Error};
+
+use crate::MetricsData;
+
+pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
+ if !data.values.is_object() {
+ bail!("invalid data");
+ }
+
+ let mut line = escape_measurement(&data.measurement);
+ line.push(',');
+
+ let tags = data.tags.iter().map(|(key, value)| {
+ format!("{}={}", escape_key(&key), escape_key(&value))
+ });
+ line.push_str(&tags.collect::<Vec<String>>().join(","));
+
+ line.push(' ');
+
+ let values = data.values.as_object().unwrap().iter().map(|(key, value)| {
+ let value = if value.is_string() {
+ escape_value(&value.to_string())
+ } else {
+ value.to_string()
+ };
+ format!("{}={}", escape_key(&key), value)
+ });
+
+ line.push_str(&values.collect::<Vec<String>>().join(","));
+
+ // nanosecond precision
+ line.push_str(&format!(" {}\n", data.ctime*1_000_000_000));
+ Ok(line)
+}
+
+fn escape_key(key: &str) -> String {
+ let key = key.replace(',', "\\,");
+ let key = key.replace('=', "\\=");
+ let key = key.replace(' ', "\\ ");
+ key
+}
+
+fn escape_measurement(measurement: &str) -> String {
+ let measurement = measurement.replace(',', "\\,");
+ let measurement = measurement.replace(' ', "\\ ");
+ measurement
+}
+
+fn escape_value(value: &str) -> String {
+ format!("\"{}\"",value.replace('"', "\\\""))
+}
diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
new file mode 100644
index 0000000..0a76faa
--- /dev/null
+++ b/proxmox-metrics/src/lib.rs
@@ -0,0 +1,92 @@
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use anyhow::{bail, format_err, Error};
+use serde::Serialize;
+use serde_json::Value;
+use tokio::sync::mpsc;
+
+pub mod influxdb;
+
+#[derive(Clone)]
+/// Structured data for the metric server
+pub struct MetricsData {
+ /// The category of measurements
+ pub measurement: String,
+ /// A list of to attach to the measurements
+ pub tags: HashMap<String, String>,
+ /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
+ pub values: Value,
+ /// The time of the measurement
+ pub ctime: i64,
+}
+
+impl MetricsData {
+ /// Convenient helper to create from references
+ pub fn new<V: Serialize>(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result<Self, Error> {
+ let mut new_tags = HashMap::new();
+ for (key, value) in tags {
+ new_tags.insert(key.to_string(), value.to_string());
+ }
+
+ Ok(Self{
+ measurement: measurement.to_string(),
+ tags: new_tags,
+ values: serde_json::to_value(values)?,
+ ctime,
+ })
+ }
+}
+
+pub type MetricsServerFuture =
+ Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
+
+#[derive(Clone)]
+/// A channel to send data to the metric server
+pub struct MetricsChannel {
+ pub(crate) data_channel: mpsc::Sender<Arc<MetricsData>>,
+ pub(crate) flush_channel: mpsc::Sender<()>,
+}
+
+impl MetricsChannel {
+ /// Queues the given data for the metric server. If the queue is full,
+ /// flush and try again.
+ pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
+ if let Err(err) = self.data_channel.try_send(data) {
+ match err {
+ mpsc::error::TrySendError::Full(data) => {
+ self.flush_channel.send(()).await?;
+ self.data_channel
+ .send(data)
+ .await
+ .map_err(|_| format_err!("error sending data"))?;
+ }
+ mpsc::error::TrySendError::Closed(_) => {
+ bail!("channel closed");
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Flush data to the metric server
+ pub async fn flush(&self) -> Result<(), Error> {
+ self.flush_channel.send(()).await?;
+ Ok(())
+ }
+}
+
+pub async fn send_data_to_channels(values: &[Arc<MetricsData>], channels: &[MetricsChannel]) -> Vec<Result<(), Error>> {
+ let mut futures = Vec::with_capacity(channels.len());
+ for channel in channels {
+ futures.push(async move {
+ for data in values.into_iter() {
+ channel.send_data(data.clone()).await?
+ }
+ Ok::<(), Error>(())
+ });
+ }
+
+ futures::future::join_all(futures).await
+}
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
@ 2021-12-14 13:51 ` Wolfgang Bumiller
0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-14 13:51 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
On Tue, Dec 14, 2021 at 01:24:06PM +0100, Dominik Csapak wrote:
> influxdb (udp + http(s)) only for now
>
> general architecture looks as follows:
>
> "new" returns a MetricsChannel and a Future
> the channels can be used to push data in (it flushes automatically if
> it would be over the configured size (mtu/max_body_size))
>
> and the future must be polled to actually send data to the servers.
>
> so most often it would look like this:
> let (future, channel) = InfluxDbHttp::new(..params..)?;
> let handle = tokio::spawn(future);
> channel.send_data(...).await?;
> handle.await?;
>
> when all channels go out of scope, all remaining data in the channel
> will be read and sent to the server
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> Cargo.toml | 1 +
> proxmox-metrics/Cargo.toml | 20 ++++
> proxmox-metrics/debian/changelog | 5 +
> proxmox-metrics/debian/copyright | 16 +++
> proxmox-metrics/debian/debcargo.toml | 7 ++
> proxmox-metrics/src/influxdb/http.rs | 143 ++++++++++++++++++++++++++
> proxmox-metrics/src/influxdb/mod.rs | 7 ++
> proxmox-metrics/src/influxdb/udp.rs | 107 +++++++++++++++++++
> proxmox-metrics/src/influxdb/utils.rs | 51 +++++++++
> proxmox-metrics/src/lib.rs | 92 +++++++++++++++++
> 10 files changed, 449 insertions(+)
> create mode 100644 proxmox-metrics/Cargo.toml
> create mode 100644 proxmox-metrics/debian/changelog
> create mode 100644 proxmox-metrics/debian/copyright
> create mode 100644 proxmox-metrics/debian/debcargo.toml
> create mode 100644 proxmox-metrics/src/influxdb/http.rs
> create mode 100644 proxmox-metrics/src/influxdb/mod.rs
> create mode 100644 proxmox-metrics/src/influxdb/udp.rs
> create mode 100644 proxmox-metrics/src/influxdb/utils.rs
> create mode 100644 proxmox-metrics/src/lib.rs
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 8f85e08..4a458d2 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -6,6 +6,7 @@ members = [
> "proxmox-http",
> "proxmox-io",
> "proxmox-lang",
> + "proxmox-metrics",
> "proxmox-router",
> "proxmox-schema",
> "proxmox-serde",
> diff --git a/proxmox-metrics/Cargo.toml b/proxmox-metrics/Cargo.toml
> new file mode 100644
> index 0000000..9ac50fe
> --- /dev/null
> +++ b/proxmox-metrics/Cargo.toml
> @@ -0,0 +1,20 @@
> +[package]
> +name = "proxmox-metrics"
> +version = "0.1.0"
> +authors = ["Proxmox Support Team <support@proxmox.com>"]
> +edition = "2018"
> +license = "AGPL-3"
> +description = "Metrics Server export utilitites"
> +
> +exclude = [ "debian" ]
> +
> +[dependencies]
> +anyhow = "1.0"
> +tokio = { version = "1.0", features = [ "net", "sync" ] }
> +futures = "0.3"
> +serde = "1.0"
> +serde_json = "1.0"
> +http = "0.2"
> +hyper = "0.14"
> +openssl = "0.10"
Please sort the above, and separate the line below from the above group
with a newline
> +proxmox-http = { path = "../proxmox-http", features = [ "client" ], version = "0.6" }
> diff --git a/proxmox-metrics/debian/changelog b/proxmox-metrics/debian/changelog
> new file mode 100644
> index 0000000..c02803b
> --- /dev/null
> +++ b/proxmox-metrics/debian/changelog
> @@ -0,0 +1,5 @@
> +rust-proxmox-metrics (0.1.0-1) unstable; urgency=medium
> +
> + * initial package
> +
> + -- Proxmox Support Team <support@proxmox.com> Tue, 14 Dec 2021 08:56:54 +0100
> diff --git a/proxmox-metrics/debian/copyright b/proxmox-metrics/debian/copyright
> new file mode 100644
> index 0000000..5661ef6
> --- /dev/null
> +++ b/proxmox-metrics/debian/copyright
> @@ -0,0 +1,16 @@
> +Copyright (C) 2021 Proxmox Server Solutions GmbH
> +
> +This software is written by Proxmox Server Solutions GmbH <support@proxmox.com>
> +
> +This program is free software: you can redistribute it and/or modify
> +it under the terms of the GNU Affero General Public License as published by
> +the Free Software Foundation, either version 3 of the License, or
> +(at your option) any later version.
> +
> +This program is distributed in the hope that it will be useful,
> +but WITHOUT ANY WARRANTY; without even the implied warranty of
> +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +GNU Affero General Public License for more details.
> +
> +You should have received a copy of the GNU Affero General Public License
> +along with this program. If not, see <http://www.gnu.org/licenses/>.
> diff --git a/proxmox-metrics/debian/debcargo.toml b/proxmox-metrics/debian/debcargo.toml
> new file mode 100644
> index 0000000..b7864cd
> --- /dev/null
> +++ b/proxmox-metrics/debian/debcargo.toml
> @@ -0,0 +1,7 @@
> +overlay = "."
> +crate_src_path = ".."
> +maintainer = "Proxmox Support Team <support@proxmox.com>"
> +
> +[source]
> +vcs_git = "git://git.proxmox.com/git/proxmox.git"
> +vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
> diff --git a/proxmox-metrics/src/influxdb/http.rs b/proxmox-metrics/src/influxdb/http.rs
> new file mode 100644
> index 0000000..8f1157d
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/http.rs
> @@ -0,0 +1,143 @@
> +use std::sync::Arc;
> +
> +use anyhow::{bail, Error};
> +use futures::{future::FutureExt, select};
> +use hyper::Body;
> +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
> +use tokio::sync::mpsc;
> +
> +use proxmox_http::client::{SimpleHttp, SimpleHttpOptions};
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbHttp {
> + client: SimpleHttp,
> + _healthuri: http::Uri,
> + writeuri: http::Uri,
> + token: Option<String>,
> + max_body_size: usize,
> + data: String,
> + data_channel: mpsc::Receiver<Arc<MetricsData>>,
> + flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbHttp {
> + pub fn new(
> + https: bool,
> + host: &str,
> + port: u16,
> + organization: &str,
> + bucket: &str,
> + token: Option<&str>,
> + verify_tls: bool,
> + max_body_size: usize,
> + ) -> Result<(MetricsServerFuture, MetricsChannel), Error> {
> + let (data_tx, data_rx) = mpsc::channel(1024);
> + let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> + let client = if verify_tls {
> + SimpleHttp::with_options(SimpleHttpOptions::default())
> + } else {
> + let mut ssl_connector = SslConnector::builder(SslMethod::tls()).unwrap();
> + ssl_connector.set_verify(SslVerifyMode::NONE);
> + SimpleHttp::with_ssl_connector(ssl_connector.build(), SimpleHttpOptions::default())
> + };
> +
> + let authority = proxmox_http::uri::build_authority(host, port)?;
> +
> + let writeuri = http::uri::Builder::new()
> + .scheme(if https { "https" } else { "http" })
> + .authority(authority.clone())
> + .path_and_query(format!(
> + "/api/v2/write?org={}&bucket={}",
> + organization, bucket
> + ))
> + .build()?;
> +
> + let healthuri = http::uri::Builder::new()
> + .scheme(if https { "https" } else { "http" })
> + .authority(authority)
> + .path_and_query("/health")
> + .build()?;
> +
> + let this = Self {
> + client,
> + writeuri,
> + _healthuri: healthuri,
> + token: token.map(String::from),
> + max_body_size,
> + data: String::new(),
> + data_channel: data_rx,
> + flush_channel: flush_rx,
> + };
> +
> + let future = Box::pin(this.finish());
> + let channel = MetricsChannel {
> + data_channel: data_tx,
> + flush_channel: flush_tx,
> + };
> + Ok((future, channel))
> + }
> +
> + async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> + let new_data = utils::format_influxdb_line(&data)?;
> +
> + if self.data.len() + new_data.len() >= self.max_body_size {
> + self.flush().await?;
> + }
> +
> + self.data.push_str(&new_data);
> +
> + if self.data.len() >= self.max_body_size {
> + self.flush().await?;
> + }
> +
> + Ok(())
> + }
> +
> + pub async fn flush(&mut self) -> Result<(), Error> {
> + if self.data.is_empty() {
> + return Ok(());
> + }
> + let mut request = http::Request::builder().method("POST").uri(&self.writeuri);
> +
> + if let Some(token) = &self.token {
> + request = request.header("Authorization", format!("Token {}", token));
> + }
> +
> + let request = request.body(Body::from(self.data.split_off(0)))?;
> +
> + let res = self.client.request(request).await?;
> +
> + let status = res.status();
> + if !status.is_success() {
> + bail!("got bad status: {}", status);
> + }
> + Ok(())
> + }
> +
> + async fn finish(mut self) -> Result<(), Error> {
> + loop {
> + select! {
I wonder, don't you want to receive data & flushes in some kind of
order?
Wouldn't a single channel over an
`enum MetricsValue { Flush, Data(MetricsData) }`
make more sense?
> + res = self.flush_channel.recv().fuse() => match res {
> + Some(_) => self.flush().await?,
> + None => break, // all senders gone
> + },
> + data = self.data_channel.recv().fuse() => match data {
> + Some(data) => self.add_data(data).await?,
> + None => break, // all senders gone
> + },
> + }
> + }
> +
> + // consume remaining data in channel
> + while let Some(data) = self.data_channel.recv().await {
> + self.add_data(data).await?;
> + }
> +
> + self.flush().await?;
> +
> + Ok(())
> + }
> +}
> diff --git a/proxmox-metrics/src/influxdb/mod.rs b/proxmox-metrics/src/influxdb/mod.rs
> new file mode 100644
> index 0000000..26a715c
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/mod.rs
> @@ -0,0 +1,7 @@
> +mod http;
> +pub use self::http::*;
> +
> +mod udp;
> +pub use udp::*;
> +
> +pub mod utils;
> diff --git a/proxmox-metrics/src/influxdb/udp.rs b/proxmox-metrics/src/influxdb/udp.rs
> new file mode 100644
> index 0000000..de2b0d5
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/udp.rs
> @@ -0,0 +1,107 @@
> +use std::sync::Arc;
> +
> +use anyhow::Error;
> +use futures::{future::FutureExt, select};
> +use tokio::net::UdpSocket;
> +use tokio::sync::mpsc;
> +
> +use crate::influxdb::utils;
> +use crate::{MetricsChannel, MetricsData, MetricsServerFuture};
> +
> +pub struct InfluxDbUdp {
> + address: String,
> + conn: Option<tokio::net::UdpSocket>,
> + mtu: u16,
> + data: String,
> + data_channel: mpsc::Receiver<Arc<MetricsData>>,
> + flush_channel: mpsc::Receiver<()>,
> +}
> +
> +impl InfluxDbUdp {
> + pub fn new(host: &str, port: u16, mtu: Option<u16>) -> (MetricsServerFuture, MetricsChannel) {
> + let (data_tx, data_rx) = mpsc::channel(1024);
> + let (flush_tx, flush_rx) = mpsc::channel(1);
> +
> + let address = if host.len() > 3 && host.contains(':') && &host[0..1] != "[" {
> + format!("[{}]:{}", host, port)
Here you handle IPv6 but...
> + } else {
> + format!("{}:{}", host, port)
> + };
> +
> + let this = Self {
> + address,
> + conn: None,
> + mtu: mtu.unwrap_or(1500),
> + data: String::new(),
> + data_channel: data_rx,
> + flush_channel: flush_rx,
> + };
> +
> + let future = Box::pin(this.finish());
> +
> + let channel = MetricsChannel {
> + data_channel: data_tx,
> + flush_channel: flush_tx,
> + };
> +
> + (future, channel)
> + }
> +
> + async fn connect(&mut self) -> Result<tokio::net::UdpSocket, Error> {
> + let conn = UdpSocket::bind("0.0.0.0:0").await?;
...here you're specifically binding to an IPv4 which will cause rust to
issue a `socket(AF_INET, ...)` syscall rather than `socket(AF_INET6,
...)` for IPv6.
> + let addr = self.address.clone();
> + conn.connect(addr).await?;
> + Ok(conn)
> + }
> +
> + async fn add_data(&mut self, data: Arc<MetricsData>) -> Result<(), Error> {
> + let new_data = utils::format_influxdb_line(&data)?;
> +
> + if self.data.len() + new_data.len() >= (self.mtu as usize) {
> + self.flush().await?;
> + }
> +
> + self.data.push_str(&new_data);
Is it possible for `new_data.len()` to be larger than the mtu? if so,
should this warn or something?
Otherwise the next flush below might become a problem?
> +
> + if self.data.len() >= (self.mtu as usize) {
> + self.flush().await?;
> + }
> +
> + Ok(())
> + }
> +
> + async fn flush(&mut self) -> Result<(), Error> {
> + let conn = match self.conn.take() {
> + Some(conn) => conn,
> + None => self.connect().await?,
> + };
> +
> + conn.send(self.data.split_off(0).as_bytes()).await?;
> + self.conn = Some(conn);
> + Ok(())
> + }
> +
> + async fn finish(mut self) -> Result<(), Error> {
> + loop {
> + select! {
> + res = self.flush_channel.recv().fuse() => match res {
> + Some(_) => self.flush().await?,
> + None => break, // all senders gone
> + },
> + data = self.data_channel.recv().fuse() => match data {
> + Some(data) => self.add_data(data).await?,
> + None => break, // all senders gone
> + },
> + }
> + }
> +
> + // consume remaining data in channel
> + while let Some(data) = self.data_channel.recv().await {
> + self.add_data(data).await?;
> + }
> +
> + self.flush().await?;
> +
> + Ok(())
> + }
> +}
> diff --git a/proxmox-metrics/src/influxdb/utils.rs b/proxmox-metrics/src/influxdb/utils.rs
> new file mode 100644
> index 0000000..bf391f9
> --- /dev/null
> +++ b/proxmox-metrics/src/influxdb/utils.rs
> @@ -0,0 +1,51 @@
> +use anyhow::{bail, Error};
> +
> +use crate::MetricsData;
> +
> +pub(crate) fn format_influxdb_line(data: &MetricsData) -> Result<String, Error> {
> + if !data.values.is_object() {
> + bail!("invalid data");
> + }
> +
> + let mut line = escape_measurement(&data.measurement);
> + line.push(',');
> +
> + let tags = data.tags.iter().map(|(key, value)| {
> + format!("{}={}", escape_key(&key), escape_key(&value))
> + });
> + line.push_str(&tags.collect::<Vec<String>>().join(","));
I'm not too fond of the temporary `Vec` here and below, maybe use
`line.extend()` with the ',' as part of the format string (",{}={}") or
skip even the temporary format and just
for (key, value) in &data.tags {
line.push(',')
line.push_str(escape_key(&key))
line.push('=')
line.push_str(value)
}
it's not really longer... alternatively, more readable and without the
temporary `String` would be `write!(line, ",{}={}", ...)?` etc.
> +
> + line.push(' ');
> +
> + let values = data.values.as_object().unwrap().iter().map(|(key, value)| {
> + let value = if value.is_string() {
> + escape_value(&value.to_string())
^ extra space? :P
> + } else {
> + value.to_string()
> + };
> + format!("{}={}", escape_key(&key), value)
> + });
> +
> + line.push_str(&values.collect::<Vec<String>>().join(","));
> +
> + // nanosecond precision
> + line.push_str(&format!(" {}\n", data.ctime*1_000_000_000));
> + Ok(line)
> +}
> +
> +fn escape_key(key: &str) -> String {
> + let key = key.replace(',', "\\,");
> + let key = key.replace('=', "\\=");
> + let key = key.replace(' ', "\\ ");
> + key
> +}
> +
> +fn escape_measurement(measurement: &str) -> String {
> + let measurement = measurement.replace(',', "\\,");
> + let measurement = measurement.replace(' ', "\\ ");
> + measurement
> +}
> +
> +fn escape_value(value: &str) -> String {
> + format!("\"{}\"",value.replace('"', "\\\""))
> +}
> diff --git a/proxmox-metrics/src/lib.rs b/proxmox-metrics/src/lib.rs
> new file mode 100644
> index 0000000..0a76faa
> --- /dev/null
> +++ b/proxmox-metrics/src/lib.rs
> @@ -0,0 +1,92 @@
> +use std::collections::HashMap;
> +use std::pin::Pin;
> +use std::sync::Arc;
> +
> +use anyhow::{bail, format_err, Error};
> +use serde::Serialize;
> +use serde_json::Value;
> +use tokio::sync::mpsc;
> +
> +pub mod influxdb;
> +
> +#[derive(Clone)]
> +/// Structured data for the metric server
> +pub struct MetricsData {
> + /// The category of measurements
> + pub measurement: String,
> + /// A list of to attach to the measurements
> + pub tags: HashMap<String, String>,
> + /// The actual values to send. Only plain (not-nested) objects are supported at the moment.
> + pub values: Value,
> + /// The time of the measurement
> + pub ctime: i64,
> +}
> +
> +impl MetricsData {
> + /// Convenient helper to create from references
> + pub fn new<V: Serialize>(measurement: &str, tags: &[(&str, &str)], ctime: i64, values: V) -> Result<Self, Error> {
> + let mut new_tags = HashMap::new();
> + for (key, value) in tags {
> + new_tags.insert(key.to_string(), value.to_string());
> + }
> +
> + Ok(Self{
> + measurement: measurement.to_string(),
> + tags: new_tags,
> + values: serde_json::to_value(values)?,
> + ctime,
> + })
> + }
> +}
> +
> +pub type MetricsServerFuture =
> + Pin<Box<dyn std::future::Future<Output = Result<(), Error>> + Send + 'static>>;
> +
> +#[derive(Clone)]
> +/// A channel to send data to the metric server
> +pub struct MetricsChannel {
> + pub(crate) data_channel: mpsc::Sender<Arc<MetricsData>>,
> + pub(crate) flush_channel: mpsc::Sender<()>,
> +}
> +
> +impl MetricsChannel {
> + /// Queues the given data for the metric server. If the queue is full,
> + /// flush and try again.
> + pub async fn send_data(&self, data: Arc<MetricsData>) -> Result<(), Error> {
> + if let Err(err) = self.data_channel.try_send(data) {
> + match err {
> + mpsc::error::TrySendError::Full(data) => {
> + self.flush_channel.send(()).await?;
> + self.data_channel
> + .send(data)
> + .await
> + .map_err(|_| format_err!("error sending data"))?;
> + }
> + mpsc::error::TrySendError::Closed(_) => {
> + bail!("channel closed");
> + }
> + }
> + }
> + Ok(())
> + }
> +
> + /// Flush data to the metric server
> + pub async fn flush(&self) -> Result<(), Error> {
> + self.flush_channel.send(()).await?;
> + Ok(())
> + }
> +}
> +
> +pub async fn send_data_to_channels(values: &[Arc<MetricsData>], channels: &[MetricsChannel]) -> Vec<Result<(), Error>> {
> + let mut futures = Vec::with_capacity(channels.len());
> + for channel in channels {
> + futures.push(async move {
> + for data in values.into_iter() {
`.into_iter()` shouldn't be necessary, that's how `for` loops are
defined after all.
> + channel.send_data(data.clone()).await?
> + }
> + Ok::<(), Error>(())
> + });
> + }
> +
> + futures::future::join_all(futures).await
> +}
> --
> 2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (2 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox 3/3] proxmox-metrics: implement metrics server client code Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types Dominik Csapak
` (4 subsequent siblings)
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/api2/admin/datastore.rs | 4 ++--
src/api2/node/status.rs | 10 ++++++++--
src/api2/status.rs | 4 ++--
src/bin/proxmox-backup-proxy.rs | 2 +-
src/tools/disks/mod.rs | 21 +--------------------
5 files changed, 14 insertions(+), 27 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index b653f906..0d0b91f1 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -612,7 +612,7 @@ pub fn status(
rpcenv: &mut dyn RpcEnvironment,
) -> Result<DataStoreStatus, Error> {
let datastore = DataStore::lookup_datastore(&store)?;
- let storage = crate::tools::disks::disk_usage(&datastore.base_path())?;
+ let storage = proxmox_sys::fs::disk_usage(&datastore.base_path())?;
let (counts, gc_status) = if verbose {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let user_info = CachedUserInfo::new()?;
@@ -635,7 +635,7 @@ pub fn status(
Ok(DataStoreStatus {
total: storage.total,
used: storage.used,
- avail: storage.avail,
+ avail: storage.available,
gc_status,
counts,
})
diff --git a/src/api2/node/status.rs b/src/api2/node/status.rs
index 9559dda6..7861d3a5 100644
--- a/src/api2/node/status.rs
+++ b/src/api2/node/status.rs
@@ -9,7 +9,7 @@ use proxmox_sys::linux::procfs;
use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
use proxmox_schema::api;
-use pbs_api_types::{NODE_SCHEMA, NodePowerCommand, PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT};
+use pbs_api_types::{NODE_SCHEMA, NodePowerCommand, PRIV_SYS_AUDIT, PRIV_SYS_POWER_MANAGEMENT, StorageStatus};
use crate::api2::types::{
NodeCpuInformation, NodeStatus, NodeMemoryCounters, NodeSwapCounters, NodeInformation,
@@ -77,10 +77,16 @@ fn get_status(
uname.version()
);
+ let disk = proxmox_sys::fs::disk_usage(Path::new("/"))?;
+
Ok(NodeStatus {
memory,
swap,
- root: crate::tools::disks::disk_usage(Path::new("/"))?,
+ root: StorageStatus {
+ total: disk.total,
+ used: disk.used,
+ avail: disk.available,
+ },
uptime: procfs::read_proc_uptime()?.0 as u64,
loadavg,
kversion,
diff --git a/src/api2/status.rs b/src/api2/status.rs
index 7f50914b..48c283e6 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -110,13 +110,13 @@ pub fn datastore_status(
continue;
}
};
- let status = crate::tools::disks::disk_usage(&datastore.base_path())?;
+ let status = proxmox_sys::fs::disk_usage(&datastore.base_path())?;
let mut entry = json!({
"store": store,
"total": status.total,
"used": status.used,
- "avail": status.avail,
+ "avail": status.available,
"gc-status": datastore.last_gc_status(),
});
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 07a53687..fa79322d 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -1055,7 +1055,7 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
- match proxmox_backup::tools::disks::disk_usage(path) {
+ match proxmox_sys::fs::disk_usage(path) {
Ok(status) => {
let rrd_key = format!("{}/total", rrd_prefix);
rrd_update_gauge(&rrd_key, status.total as f64);
diff --git a/src/tools/disks/mod.rs b/src/tools/disks/mod.rs
index 867aa624..30d9568f 100644
--- a/src/tools/disks/mod.rs
+++ b/src/tools/disks/mod.rs
@@ -19,7 +19,7 @@ use proxmox_sys::linux::procfs::{MountInfo, mountinfo::Device};
use proxmox_sys::{io_bail, io_format_err};
use proxmox_schema::api;
-use pbs_api_types::{BLOCKDEVICE_NAME_REGEX, StorageStatus};
+use pbs_api_types::BLOCKDEVICE_NAME_REGEX;
mod zfs;
pub use zfs::*;
@@ -521,25 +521,6 @@ impl Disk {
}
}
-/// Returns disk usage information (total, used, avail)
-pub fn disk_usage(path: &std::path::Path) -> Result<StorageStatus, Error> {
-
- let mut stat: libc::statfs64 = unsafe { std::mem::zeroed() };
-
- use nix::NixPath;
-
- let res = path.with_nix_path(|cstr| unsafe { libc::statfs64(cstr.as_ptr(), &mut stat) })?;
- nix::errno::Errno::result(res)?;
-
- let bsize = stat.f_bsize as u64;
-
- Ok(StorageStatus{
- total: stat.f_blocks*bsize,
- used: (stat.f_blocks-stat.f_bfree)*bsize,
- avail: stat.f_bavail*bsize,
- })
-}
-
#[api()]
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all="lowercase")]
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (3 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 1/6] use 'disk_usage' from proxmox-sys Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class Dominik Csapak
` (3 subsequent siblings)
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
InfluxDbUdp and InfluxDbHttp for now
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
pbs-api-types/src/lib.rs | 2 +
pbs-api-types/src/metrics.rs | 134 +++++++++++++++++++++++++++++++++++
2 files changed, 136 insertions(+)
create mode 100644 pbs-api-types/src/metrics.rs
diff --git a/pbs-api-types/src/lib.rs b/pbs-api-types/src/lib.rs
index 0a0dd33d..09bd59c8 100644
--- a/pbs-api-types/src/lib.rs
+++ b/pbs-api-types/src/lib.rs
@@ -88,6 +88,8 @@ pub use traffic_control::*;
mod zfs;
pub use zfs::*;
+mod metrics;
+pub use metrics::*;
#[rustfmt::skip]
#[macro_use]
diff --git a/pbs-api-types/src/metrics.rs b/pbs-api-types/src/metrics.rs
new file mode 100644
index 00000000..c7e08885
--- /dev/null
+++ b/pbs-api-types/src/metrics.rs
@@ -0,0 +1,134 @@
+use serde::{Deserialize, Serialize};
+
+use crate::{DNS_NAME_OR_IP_SCHEMA, PROXMOX_SAFE_ID_FORMAT, SINGLE_LINE_COMMENT_SCHEMA};
+use proxmox_schema::{api, Schema, StringSchema, Updater};
+
+pub const METRIC_SERVER_ID_SCHEMA: Schema = StringSchema::new("Metrics Server ID.")
+ .format(&PROXMOX_SAFE_ID_FORMAT)
+ .min_length(3)
+ .max_length(32)
+ .schema();
+
+pub const INFLUXDB_BUCKET_SCHEMA: Schema = StringSchema::new("InfluxDB Bucket.")
+ .format(&PROXMOX_SAFE_ID_FORMAT)
+ .min_length(3)
+ .max_length(32)
+ .default("proxmox")
+ .schema();
+
+pub const INFLUXDB_ORGANIZATION_SCHEMA: Schema = StringSchema::new("InfluxDB Organization.")
+ .format(&PROXMOX_SAFE_ID_FORMAT)
+ .min_length(3)
+ .max_length(32)
+ .default("proxmox")
+ .schema();
+
+#[api(
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ host: {
+ schema: DNS_NAME_OR_IP_SCHEMA,
+ },
+ port: {
+ description: "The port",
+ type: u16,
+ },
+ mtu: {
+ description: "The MTU",
+ type: u16,
+ optional: true,
+ default: 1500,
+ },
+ comment: {
+ optional: true,
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ },
+ },
+)]
+#[derive(Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// InfluxDB Server (UDP)
+pub struct InfluxDbUdp {
+ #[updater(skip)]
+ pub name: String,
+ pub host: String,
+ pub port: u16,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub mtu: Option<u16>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub comment: Option<String>,
+}
+
+#[api(
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ host: {
+ schema: DNS_NAME_OR_IP_SCHEMA,
+ },
+ https: {
+ description: "If true, HTTPS is used.",
+ type: bool,
+ optional: true,
+ default: true,
+ },
+ token: {
+ description: "The (optional) API token",
+ type: String,
+ optional: true,
+ },
+ bucket: {
+ schema: INFLUXDB_BUCKET_SCHEMA,
+ optional: true,
+ },
+ organization: {
+ schema: INFLUXDB_ORGANIZATION_SCHEMA,
+ optional: true,
+ },
+ "max-body-size": {
+ description: "The (optional) maximum body size",
+ type: usize,
+ optional: true,
+ default: 25_000_000,
+ },
+ "verify-tls": {
+ description: "If true, the certificate will be validated.",
+ type: bool,
+ optional: true,
+ default: true,
+ },
+ comment: {
+ optional: true,
+ schema: SINGLE_LINE_COMMENT_SCHEMA,
+ },
+ },
+)]
+#[derive(Serialize, Deserialize, Updater)]
+#[serde(rename_all = "kebab-case")]
+/// InfluxDB Server (HTTP(s))
+pub struct InfluxDbHttp {
+ #[updater(skip)]
+ pub name: String,
+ pub host: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ /// The (optional) port. (defaults: 80 for HTTP, 443 for HTTPS)
+ pub port: Option<u16>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub https: Option<bool>,
+ /// The Optional Token
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub token: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub bucket: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub organization: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub max_body_size: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub verify_tls: Option<bool>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub comment: Option<String>,
+}
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (4 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 2/6] pbs-api-types: add metrics api types Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update Dominik Csapak
` (2 subsequent siblings)
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
a section config like in pve
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Cargo.toml | 1 +
pbs-config/Cargo.toml | 1 +
pbs-config/src/lib.rs | 1 +
pbs-config/src/metrics.rs | 122 ++++++++++++++++++++++++++++++++++++++
4 files changed, 125 insertions(+)
create mode 100644 pbs-config/src/metrics.rs
diff --git a/Cargo.toml b/Cargo.toml
index d7ad2085..1643b628 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,6 +96,7 @@ pxar = { version = "0.10.1", features = [ "tokio-io" ] }
proxmox-http = { version = "0.6", features = [ "client", "http-helpers", "websocket" ] }
proxmox-io = "1"
proxmox-lang = "1"
+proxmox-metrics = "0.1"
proxmox-router = { version = "1.1", features = [ "cli" ] }
proxmox-schema = { version = "1", features = [ "api-macro" ] }
proxmox-section-config = "1"
diff --git a/pbs-config/Cargo.toml b/pbs-config/Cargo.toml
index cd14d823..4c920712 100644
--- a/pbs-config/Cargo.toml
+++ b/pbs-config/Cargo.toml
@@ -25,6 +25,7 @@ proxmox-time = "1"
proxmox-serde = "0.1"
proxmox-shared-memory = "0.2"
proxmox-sys = "0.2"
+proxmox-metrics = "0.1"
pbs-api-types = { path = "../pbs-api-types" }
pbs-buildcfg = { path = "../pbs-buildcfg" }
diff --git a/pbs-config/src/lib.rs b/pbs-config/src/lib.rs
index 118030bc..29880ab9 100644
--- a/pbs-config/src/lib.rs
+++ b/pbs-config/src/lib.rs
@@ -6,6 +6,7 @@ pub mod domains;
pub mod drive;
pub mod key_config;
pub mod media_pool;
+pub mod metrics;
pub mod network;
pub mod remote;
pub mod sync;
diff --git a/pbs-config/src/metrics.rs b/pbs-config/src/metrics.rs
new file mode 100644
index 00000000..97f0cd17
--- /dev/null
+++ b/pbs-config/src/metrics.rs
@@ -0,0 +1,122 @@
+use std::collections::HashMap;
+
+use anyhow::Error;
+use lazy_static::lazy_static;
+
+use proxmox_metrics::{influxdb, MetricsChannel, MetricsServerFuture};
+use proxmox_schema::*;
+use proxmox_section_config::{SectionConfig, SectionConfigData, SectionConfigPlugin};
+
+use pbs_api_types::{InfluxDbHttp, InfluxDbUdp, METRIC_SERVER_ID_SCHEMA};
+
+use crate::{open_backup_lockfile, BackupLockGuard};
+
+lazy_static! {
+ pub static ref CONFIG: SectionConfig = init();
+}
+
+fn init() -> SectionConfig {
+ let mut config = SectionConfig::new(&METRIC_SERVER_ID_SCHEMA);
+
+ let udp_schema = match InfluxDbUdp::API_SCHEMA {
+ Schema::Object(ref object_schema) => object_schema,
+ _ => unreachable!(),
+ };
+
+ let udp_plugin = SectionConfigPlugin::new(
+ "influxdb-udp".to_string(),
+ Some("name".to_string()),
+ udp_schema,
+ );
+ config.register_plugin(udp_plugin);
+
+ let http_schema = match InfluxDbHttp::API_SCHEMA {
+ Schema::Object(ref object_schema) => object_schema,
+ _ => unreachable!(),
+ };
+
+ let http_plugin = SectionConfigPlugin::new(
+ "influxdb-http".to_string(),
+ Some("name".to_string()),
+ http_schema,
+ );
+
+ config.register_plugin(http_plugin);
+
+ config
+}
+
+pub const METRIC_SERVER_CFG_FILENAME: &str = "/etc/proxmox-backup/metricserver.cfg";
+pub const METRIC_SERVER_CFG_LOCKFILE: &str = "/etc/proxmox-backup/.metricserver.lck";
+
+/// Get exclusive lock
+pub fn lock_config() -> Result<BackupLockGuard, Error> {
+ open_backup_lockfile(METRIC_SERVER_CFG_LOCKFILE, None, true)
+}
+
+pub fn config() -> Result<(SectionConfigData, [u8; 32]), Error> {
+ let content = proxmox_sys::fs::file_read_optional_string(METRIC_SERVER_CFG_FILENAME)?
+ .unwrap_or_else(|| "".to_string());
+
+ let digest = openssl::sha::sha256(content.as_bytes());
+ let data = CONFIG.parse(METRIC_SERVER_CFG_FILENAME, &content)?;
+ Ok((data, digest))
+}
+
+pub fn save_config(config: &SectionConfigData) -> Result<(), Error> {
+ let raw = CONFIG.write(METRIC_SERVER_CFG_FILENAME, &config)?;
+ crate::replace_backup_config(METRIC_SERVER_CFG_FILENAME, raw.as_bytes())
+}
+
+// shell completion helper
+pub fn complete_remote_name(_arg: &str, _param: &HashMap<String, String>) -> Vec<String> {
+ match config() {
+ Ok((data, _digest)) => data.sections.iter().map(|(id, _)| id.to_string()).collect(),
+ Err(_) => return vec![],
+ }
+}
+
+/// Get the metric server connections from a config
+pub fn get_metric_server_connections(
+ metric_config: SectionConfigData,
+) -> Result<(Vec<MetricsServerFuture>, Vec<MetricsChannel>, Vec<String>), Error> {
+ let mut futures = Vec::new();
+ let mut channels = Vec::new();
+ let mut names = Vec::new();
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+ {
+ let (future, sender) = influxdb::InfluxDbUdp::new(&config.host, config.port, config.mtu);
+ names.push(config.name);
+ futures.push(future);
+ channels.push(sender);
+ }
+
+ for config in
+ metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+ {
+ let port = config.port.unwrap_or_else(|| {
+ if config.https.unwrap_or(true) {
+ 443
+ } else {
+ 80
+ }
+ });
+
+ let (future, sender) = influxdb::InfluxDbHttp::new(
+ config.https.unwrap_or(true),
+ &config.host,
+ port,
+ config.organization.as_deref().unwrap_or("proxmox"),
+ config.bucket.as_deref().unwrap_or("proxmox"),
+ config.token.as_deref(),
+ config.verify_tls.unwrap_or(true),
+ config.max_body_size.unwrap_or(25_000_000),
+ )?;
+ names.push(config.name);
+ futures.push(future);
+ channels.push(sender);
+ }
+ Ok((futures, channels, names))
+}
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (5 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 3/6] pbs-config: add metrics config class Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
8 siblings, 0 replies; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
that way we can reuse the stats gathered
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 213 +++++++++++++++++++++-----------
1 file changed, 141 insertions(+), 72 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index fa79322d..2700fabf 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,8 +17,11 @@ use tokio_stream::wrappers::ReceiverStream;
use serde_json::{json, Value};
use http::{Method, HeaderMap};
-use proxmox_sys::linux::socket::set_tcp_keepalive;
-use proxmox_sys::fs::CreateOptions;
+use proxmox_sys::linux::{
+ procfs::{ProcFsStat, ProcFsMemInfo, ProcFsNetDev, Loadavg},
+ socket::set_tcp_keepalive
+};
+use proxmox_sys::fs::{CreateOptions, DiskUsage};
use proxmox_lang::try_block;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
@@ -44,6 +47,7 @@ use proxmox_backup::{
Job,
},
},
+ tools::disks::BlockDevStat,
};
use pbs_buildcfg::configdir;
@@ -931,9 +935,24 @@ async fn run_stat_generator() {
loop {
let delay_target = Instant::now() + Duration::from_secs(10);
- generate_host_stats().await;
+ let (hoststats, hostdisk, datastores) = match tokio::task::spawn_blocking(|| {
+ let hoststats = collect_host_stats_sync();
+ let (hostdisk, datastores) = collect_disk_stats_sync();
+ (Arc::new(hoststats), Arc::new(hostdisk), Arc::new(datastores))
+ }).await {
+ Ok(res) => res,
+ Err(err) => {
+ log::error!("collecting host stats paniced: {}", err);
+ tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+ continue;
+ }
+ };
+
+ let rrd_future = tokio::task::spawn_blocking(move || {
+ rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
+ rrd_sync_journal();
+ });
- rrd_sync_journal();
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
@@ -941,86 +960,147 @@ async fn run_stat_generator() {
}
-async fn generate_host_stats() {
- match tokio::task::spawn_blocking(generate_host_stats_sync).await {
- Ok(()) => (),
- Err(err) => log::error!("generate_host_stats paniced: {}", err),
- }
+struct HostStats {
+ proc: Option<ProcFsStat>,
+ meminfo: Option<ProcFsMemInfo>,
+ net: Option<Vec<ProcFsNetDev>>,
+ load: Option<Loadavg>,
+}
+
+struct DiskStat {
+ name: String,
+ usage: Option<DiskUsage>,
+ dev: Option<BlockDevStat>,
}
-fn generate_host_stats_sync() {
+fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
- match read_proc_stat() {
- Ok(stat) => {
- rrd_update_gauge("host/cpu", stat.cpu);
- rrd_update_gauge("host/iowait", stat.iowait_percent);
- }
+ let proc = match read_proc_stat() {
+ Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_proc_stat failed - {}", err);
+ None
}
- }
+ };
- match read_meminfo() {
- Ok(meminfo) => {
- rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
- rrd_update_gauge("host/memused", meminfo.memused as f64);
- rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
- rrd_update_gauge("host/swapused", meminfo.swapused as f64);
- }
+ let meminfo = match read_meminfo() {
+ Ok(stat) => Some(stat),
Err(err) => {
eprintln!("read_meminfo failed - {}", err);
+ None
}
- }
+ };
- match read_proc_net_dev() {
- Ok(netdev) => {
- use pbs_config::network::is_physical_nic;
- let mut netin = 0;
- let mut netout = 0;
- for item in netdev {
- if !is_physical_nic(&item.device) { continue; }
- netin += item.receive;
- netout += item.send;
- }
- rrd_update_derive("host/netin", netin as f64);
- rrd_update_derive("host/netout", netout as f64);
- }
+ let net = match read_proc_net_dev() {
+ Ok(netdev) => Some(netdev),
Err(err) => {
eprintln!("read_prox_net_dev failed - {}", err);
+ None
}
- }
+ };
- match read_loadavg() {
- Ok(loadavg) => {
- rrd_update_gauge("host/loadavg", loadavg.0 as f64);
- }
+ let load = match read_loadavg() {
+ Ok(loadavg) => Some(loadavg),
Err(err) => {
eprintln!("read_loadavg failed - {}", err);
+ None
}
+ };
+
+ HostStats {
+ proc,
+ meminfo,
+ net,
+ load,
}
+}
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
let disk_manager = DiskManage::new();
- gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+ let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+ let mut datastores = Vec::new();
match pbs_config::datastore::config() {
Ok((config, _)) => {
let datastore_list: Vec<DataStoreConfig> =
config.convert_to_typed_array("datastore").unwrap_or_default();
for config in datastore_list {
-
- let rrd_prefix = format!("datastore/{}", config.name);
let path = std::path::Path::new(&config.path);
- gather_disk_stats(disk_manager.clone(), path, &rrd_prefix);
+ datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
}
}
Err(err) => {
eprintln!("read datastore config failed - {}", err);
}
}
+
+ (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+ if let Some(stat) = &host.proc {
+ rrd_update_gauge("host/cpu", stat.cpu);
+ rrd_update_gauge("host/iowait", stat.iowait_percent);
+ }
+
+ if let Some(meminfo) = &host.meminfo {
+ rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+ rrd_update_gauge("host/memused", meminfo.memused as f64);
+ rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+ rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+ }
+
+ if let Some(netdev) = &host.net {
+ use pbs_config::network::is_physical_nic;
+ let mut netin = 0;
+ let mut netout = 0;
+ for item in netdev {
+ if !is_physical_nic(&item.device) { continue; }
+ netin += item.receive;
+ netout += item.send;
+ }
+ rrd_update_derive("host/netin", netin as f64);
+ rrd_update_derive("host/netout", netout as f64);
+ }
+
+ if let Some(loadavg) = &host.load {
+ rrd_update_gauge("host/loadavg", loadavg.0 as f64);
+ }
+
+ rrd_update_disk_stat(&hostdisk, "host");
+
+ for stat in datastores {
+ let rrd_prefix = format!("datastore/{}", stat.name);
+ rrd_update_disk_stat(&stat, &rrd_prefix);
+ }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+ if let Some(status) = &disk.usage {
+ let rrd_key = format!("{}/total", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.total as f64);
+ let rrd_key = format!("{}/used", rrd_prefix);
+ rrd_update_gauge(&rrd_key, status.used as f64);
+ }
+
+ if let Some(stat) = &disk.dev {
+ let rrd_key = format!("{}/read_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.read_ios as f64);
+ let rrd_key = format!("{}/read_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
+
+ let rrd_key = format!("{}/write_ios", rrd_prefix);
+ rrd_update_derive(&rrd_key, stat.write_ios as f64);
+ let rrd_key = format!("{}/write_bytes", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
+
+ let rrd_key = format!("{}/io_ticks", rrd_prefix);
+ rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
+ }
}
fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
@@ -1053,22 +1133,17 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
next <= now
}
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &str) {
-
- match proxmox_sys::fs::disk_usage(path) {
- Ok(status) => {
- let rrd_key = format!("{}/total", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.total as f64);
- let rrd_key = format!("{}/used", rrd_prefix);
- rrd_update_gauge(&rrd_key, status.used as f64);
- }
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+ let usage = match proxmox_sys::fs::disk_usage(path) {
+ Ok(status) => Some(status),
Err(err) => {
eprintln!("read disk_usage on {:?} failed - {}", path, err);
+ None
}
- }
+ };
- match disk_manager.find_mounted_device(path) {
- Ok(None) => {},
+ let dev = match disk_manager.find_mounted_device(path) {
+ Ok(None) => None,
Ok(Some((fs_type, device, source))) => {
let mut device_stat = None;
match fs_type.as_str() {
@@ -1090,24 +1165,18 @@ fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, rrd_prefix: &st
}
}
}
- if let Some(stat) = device_stat {
- let rrd_key = format!("{}/read_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.read_ios as f64);
- let rrd_key = format!("{}/read_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.read_sectors*512) as f64);
-
- let rrd_key = format!("{}/write_ios", rrd_prefix);
- rrd_update_derive(&rrd_key, stat.write_ios as f64);
- let rrd_key = format!("{}/write_bytes", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.write_sectors*512) as f64);
-
- let rrd_key = format!("{}/io_ticks", rrd_prefix);
- rrd_update_derive(&rrd_key, (stat.io_ticks as f64)/1000.0);
- }
+ device_stat
}
Err(err) => {
eprintln!("find_mounted_device failed - {}", err);
+ None
}
+ };
+
+ DiskStat {
+ name: name.to_string(),
+ usage,
+ dev,
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (6 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 4/6] backup-proxy: decouple stats gathering from rrd update Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-15 7:51 ` Wolfgang Bumiller
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
and keep the data as similar as possible to pve (tags/fields)
datastores get their own 'object' type and reside in the "blockstat"
measurement
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 139 +++++++++++++++++++++++++++++++-
1 file changed, 138 insertions(+), 1 deletion(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 2700fabf..fbb782dd 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -23,11 +23,13 @@ use proxmox_sys::linux::{
};
use proxmox_sys::fs::{CreateOptions, DiskUsage};
use proxmox_lang::try_block;
+use proxmox_metrics::MetricsData;
use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
use proxmox_sys::{task_log, task_warn};
use proxmox_sys::logrotate::LogRotate;
+use pbs_config::metrics::get_metric_server_connections;
use pbs_datastore::DataStore;
use proxmox_rest_server::{
@@ -948,16 +950,131 @@ async fn run_stat_generator() {
}
};
+ let hoststats2 = hoststats.clone();
+ let hostdisk2 = hostdisk.clone();
+ let datastores2 = datastores.clone();
let rrd_future = tokio::task::spawn_blocking(move || {
- rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
+ rrd_update_host_stats_sync(&hoststats2, &hostdisk2, &datastores2);
rrd_sync_journal();
});
+ let metrics_future = send_data_to_metric_servers(hoststats, hostdisk, datastores);
+
+ let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+ if let Err(err) = rrd_res {
+ log::error!("rrd update panicked: {}", err);
+ }
+ if let Err(err) = metrics_res {
+ log::error!("error during metrics sending: {}", err);
+ }
tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
}
+}
+
+async fn send_data_to_metric_servers(
+ host: Arc<HostStats>,
+ hostdisk: Arc<DiskStat>,
+ datastores: Arc<Vec<DiskStat>>,
+) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::metrics::config()?;
+ let (futures, channels, names) = get_metric_server_connections(config)?;
+
+ if futures.is_empty() {
+ return Ok(());
+ }
+
+ let names2 = names.clone();
+ let sending_handle = tokio::spawn(async move {
+ for (i, res) in future::join_all(futures).await.into_iter().enumerate() {
+ if let Err(err) = res {
+ eprintln!("ERROR '{}': {}", names2[i], err);
+ }
+ }
+ });
+
+ let ctime = proxmox_time::epoch_i64();
+ let nodename = proxmox_sys::nodename();
+
+ let mut values = Vec::new();
+
+ let mut cpuvalue = json!({});
+ if let Some(stat) = &host.proc {
+ for (key, value) in serde_json::to_value(stat)?.as_object().unwrap().iter() {
+ cpuvalue[key.clone()] = value.clone();
+ }
+ }
+
+ if let Some(loadavg) = &host.load {
+ cpuvalue["avg1"] = Value::from(loadavg.0);
+ cpuvalue["avg5"] = Value::from(loadavg.1);
+ cpuvalue["avg15"] = Value::from(loadavg.2);
+ }
+ values.push(Arc::new(MetricsData::new(
+ "cpustat",
+ &[("object", "host"), ("host", nodename)],
+ ctime,
+ cpuvalue,
+ )?));
+
+ if let Some(stat) = &host.meminfo {
+ values.push(Arc::new(MetricsData::new(
+ "memory",
+ &[("object", "host"), ("host", nodename)],
+ ctime,
+ stat,
+ )?));
+ }
+
+ if let Some(netdev) = &host.net {
+ for item in netdev {
+ values.push(Arc::new(MetricsData::new(
+ "nics",
+ &[
+ ("object", "host"),
+ ("host", nodename),
+ ("instance", &item.device),
+ ],
+ ctime,
+ item,
+ )?));
+ }
+ }
+
+ values.push(Arc::new(MetricsData::new(
+ "blockstat",
+ &[("object", "host"), ("host", nodename)],
+ ctime,
+ hostdisk.to_value(),
+ )?));
+
+ for datastore in datastores.iter() {
+ values.push(Arc::new(MetricsData::new(
+ "blockstat",
+ &[
+ ("object", "datastore"),
+ ("nodename", nodename),
+ ("datastore", &datastore.name),
+ ],
+ ctime,
+ datastore.to_value(),
+ )?));
+ }
+
+ let results = proxmox_metrics::send_data_to_channels(&values, &channels).await;
+ for (i, res) in results.into_iter().enumerate() {
+ if let Err(err) = res {
+ log::error!("error sending to {}: {}", names[i], err);
+ }
+ }
+
+ drop(channels);
+
+ sending_handle.await?;
+
+ Ok(())
}
struct HostStats {
@@ -973,6 +1090,26 @@ struct DiskStat {
dev: Option<BlockDevStat>,
}
+impl DiskStat {
+ fn to_value(&self) -> Value {
+ let mut value = json!({});
+ if let Some(usage) = &self.usage {
+ value["total"] = Value::from(usage.total);
+ value["used"] = Value::from(usage.used);
+ value["avail"] = Value::from(usage.available);
+ }
+
+ if let Some(dev) = &self.dev {
+ value["read_ios"] = Value::from(dev.read_ios);
+ value["read_bytes"] = Value::from(dev.read_sectors * 512);
+ value["write_ios"] = Value::from(dev.write_ios);
+ value["write_bytes"] = Value::from(dev.write_sectors * 512);
+ value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+ }
+ value
+ }
+}
+
fn collect_host_stats_sync() -> HostStats {
use proxmox_sys::linux::procfs::{
read_meminfo, read_proc_stat, read_proc_net_dev, read_loadavg};
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
@ 2021-12-15 7:51 ` Wolfgang Bumiller
0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-15 7:51 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
On Tue, Dec 14, 2021 at 01:24:11PM +0100, Dominik Csapak wrote:
> and keep the data as similar as possible to pve (tags/fields)
>
> datastores get their own 'object' type and reside in the "blockstat"
> measurement
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> src/bin/proxmox-backup-proxy.rs | 139 +++++++++++++++++++++++++++++++-
> 1 file changed, 138 insertions(+), 1 deletion(-)
>
> diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
> index 2700fabf..fbb782dd 100644
> --- a/src/bin/proxmox-backup-proxy.rs
> +++ b/src/bin/proxmox-backup-proxy.rs
> @@ -23,11 +23,13 @@ use proxmox_sys::linux::{
> };
> use proxmox_sys::fs::{CreateOptions, DiskUsage};
> use proxmox_lang::try_block;
> +use proxmox_metrics::MetricsData;
> use proxmox_router::{RpcEnvironment, RpcEnvironmentType, UserInformation};
> use proxmox_http::client::{RateLimitedStream, ShareableRateLimit};
> use proxmox_sys::{task_log, task_warn};
> use proxmox_sys::logrotate::LogRotate;
>
> +use pbs_config::metrics::get_metric_server_connections;
> use pbs_datastore::DataStore;
>
> use proxmox_rest_server::{
> @@ -948,16 +950,131 @@ async fn run_stat_generator() {
> }
> };
>
> + let hoststats2 = hoststats.clone();
> + let hostdisk2 = hostdisk.clone();
> + let datastores2 = datastores.clone();
Please use Arc::clone, also, I'm not sure it's worth having them all as
separate Arcs, maybe just a 3-tuple in 1 Arc?
> let rrd_future = tokio::task::spawn_blocking(move || {
> - rrd_update_host_stats_sync(&hoststats, &hostdisk, &datastores);
> + rrd_update_host_stats_sync(&hoststats2, &hostdisk2, &datastores2);
> rrd_sync_journal();
> });
>
> + let metrics_future = send_data_to_metric_servers(hoststats, hostdisk, datastores);
> +
> + let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
> + if let Err(err) = rrd_res {
> + log::error!("rrd update panicked: {}", err);
> + }
> + if let Err(err) = metrics_res {
> + log::error!("error during metrics sending: {}", err);
> + }
>
> tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
>
> }
> +}
> +
> +async fn send_data_to_metric_servers(
> + host: Arc<HostStats>,
> + hostdisk: Arc<DiskStat>,
> + datastores: Arc<Vec<DiskStat>>,
> +) -> Result<(), Error> {
> + let (config, _digest) = pbs_config::metrics::config()?;
> + let (futures, channels, names) = get_metric_server_connections(config)?;
> +
> + if futures.is_empty() {
> + return Ok(());
> + }
> +
> + let names2 = names.clone();
> + let sending_handle = tokio::spawn(async move {
> + for (i, res) in future::join_all(futures).await.into_iter().enumerate() {
> + if let Err(err) = res {
> + eprintln!("ERROR '{}': {}", names2[i], err);
> + }
> + }
> + });
> +
> + let ctime = proxmox_time::epoch_i64();
> + let nodename = proxmox_sys::nodename();
> +
> + let mut values = Vec::new();
> +
> + let mut cpuvalue = json!({});
> + if let Some(stat) = &host.proc {
> + for (key, value) in serde_json::to_value(stat)?.as_object().unwrap().iter() {
> + cpuvalue[key.clone()] = value.clone();
> + }
I may be missing something but can I not replace the entire loop with
just:
cpuvalue = to_value(stat);
?
> + }
in fact:
let mut cpuvalue = match &host.proc {
Some(stat) => serde_json.to_value(stat),
None => json!({}),
};
> +
> + if let Some(loadavg) = &host.load {
> + cpuvalue["avg1"] = Value::from(loadavg.0);
> + cpuvalue["avg5"] = Value::from(loadavg.1);
> + cpuvalue["avg15"] = Value::from(loadavg.2);
> + }
> @@ -973,6 +1090,26 @@ struct DiskStat {
> dev: Option<BlockDevStat>,
> }
>
> +impl DiskStat {
> + fn to_value(&self) -> Value {
> + let mut value = json!({});
> + if let Some(usage) = &self.usage {
> + value["total"] = Value::from(usage.total);
> + value["used"] = Value::from(usage.used);
> + value["avail"] = Value::from(usage.available);
> + }
> +
> + if let Some(dev) = &self.dev {
> + value["read_ios"] = Value::from(dev.read_ios);
> + value["read_bytes"] = Value::from(dev.read_sectors * 512);
And bye-bye goes the hope for a generic
'serialize-by-merging-into-existing-object' helper :-(
^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints
2021-12-14 12:24 [pbs-devel] [PATCH proxmox/proxmox-backup] add metrics server capability Dominik Csapak
` (7 preceding siblings ...)
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 5/6] proxmox-backup-proxy: send metrics to configured metrics server Dominik Csapak
@ 2021-12-14 12:24 ` Dominik Csapak
2021-12-15 7:39 ` Wolfgang Bumiller
8 siblings, 1 reply; 13+ messages in thread
From: Dominik Csapak @ 2021-12-14 12:24 UTC (permalink / raw)
To: pbs-devel
but in contrast to pve, we split the api by type of the section config,
since we cannot handle multiple types in the updater
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/api2/config/metricserver/influxdbhttp.rs | 272 +++++++++++++++++++
src/api2/config/metricserver/influxdbudp.rs | 242 +++++++++++++++++
src/api2/config/metricserver/mod.rs | 16 ++
src/api2/config/mod.rs | 2 +
4 files changed, 532 insertions(+)
create mode 100644 src/api2/config/metricserver/influxdbhttp.rs
create mode 100644 src/api2/config/metricserver/influxdbudp.rs
create mode 100644 src/api2/config/metricserver/mod.rs
diff --git a/src/api2/config/metricserver/influxdbhttp.rs b/src/api2/config/metricserver/influxdbhttp.rs
new file mode 100644
index 00000000..0763c979
--- /dev/null
+++ b/src/api2/config/metricserver/influxdbhttp.rs
@@ -0,0 +1,272 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use serde::{Deserialize, Serialize};
+use hex::FromHex;
+
+use proxmox_router::{Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+ InfluxDbHttp, InfluxDbHttpUpdater,
+ PROXMOX_CONFIG_DIGEST_SCHEMA, METRIC_SERVER_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use pbs_config::metrics;
+
+#[api(
+ input: {
+ properties: {},
+ },
+ returns: {
+ description: "List of configured InfluxDB http metric servers.",
+ type: Array,
+ items: { type: InfluxDbHttp },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// List configured InfluxDB http metric servers.
+pub fn list_influxdb_http_servers(
+ _param: Value,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<InfluxDbHttp>, Error> {
+
+ let (config, digest) = metrics::config()?;
+
+ let mut list: Vec<InfluxDbHttp> = config.convert_to_typed_array("influxdb-http")?;
+
+ // don't return token via api
+ for item in list.iter_mut() {
+ item.token = None;
+ }
+
+ rpcenv["digest"] = hex::encode(&digest).into();
+
+ Ok(list)
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ config: {
+ type: InfluxDbHttp,
+ flatten: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Create a new InfluxDB http server configuration
+pub fn create_influxdb_http_server(config: InfluxDbHttp) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, _digest) = metrics::config()?;
+
+ metrics.set_data(&config.name, "influxdb-http", &config)?;
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a InfluxDB http server configuration
+pub fn delete_influxdb_http_server(
+ name: String,
+ digest: Option<String>,
+ _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, expected_digest) = metrics::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = <[u8; 32]>::from_hex(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ if metrics.sections.remove(&name).is_none() {
+ bail!("name '{}' does not exist.", name);
+ }
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ },
+ },
+ returns: { type: InfluxDbHttp },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Read the InfluxDB http server configuration
+pub fn read_influxdb_http_server(
+ name: String,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<InfluxDbHttp, Error> {
+
+ let (metrics, digest) = metrics::config()?;
+
+ let mut config: InfluxDbHttp = metrics.lookup("influxdb-http", &name)?;
+
+ config.token = None;
+
+ rpcenv["digest"] = hex::encode(&digest).into();
+
+ Ok(config)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all="kebab-case")]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+ /// Delete the port property.
+ port,
+ /// Delete the https property.
+ https,
+ /// Delete the token property.
+ token,
+ /// Delete the bucket property.
+ bucket,
+ /// Delete the organization property.
+ organization,
+ /// Delete the max_body_size property.
+ max_body_size,
+ /// Delete the verify_tls property.
+ verify_tls,
+ /// Delete the comment property.
+ comment,
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ update: {
+ type: InfluxDbHttpUpdater,
+ flatten: true,
+ },
+ delete: {
+ description: "List of properties to delete.",
+ type: Array,
+ optional: true,
+ items: {
+ type: DeletableProperty,
+ }
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ returns: { type: InfluxDbHttp },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Update an InfluxDB http server configuration
+pub fn update_influxdb_http_server(
+ name: String,
+ update: InfluxDbHttpUpdater,
+ delete: Option<Vec<DeletableProperty>>,
+ digest: Option<String>,
+ _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, expected_digest) = metrics::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = <[u8; 32]>::from_hex(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ let mut config: InfluxDbHttp = metrics.lookup("influxdb-http", &name)?;
+
+ if let Some(delete) = delete {
+ for delete_prop in delete {
+ match delete_prop {
+ DeletableProperty::port => { config.port = None; },
+ DeletableProperty::https => { config.https = None; },
+ DeletableProperty::token => { config.token = None; },
+ DeletableProperty::bucket => { config.bucket = None; },
+ DeletableProperty::organization => { config.organization = None; },
+ DeletableProperty::max_body_size => { config.max_body_size = None; },
+ DeletableProperty::verify_tls => { config.verify_tls = None; },
+ DeletableProperty::comment => { config.comment = None; },
+ }
+ }
+ }
+
+ if let Some(comment) = update.comment {
+ let comment = comment.trim().to_string();
+ if comment.is_empty() {
+ config.comment = None;
+ } else {
+ config.comment = Some(comment);
+ }
+ }
+
+ if let Some(host) = update.host { config.host = host; }
+
+ if update.port.is_some() { config.port = update.port; }
+ if update.https.is_some() { config.https = update.https; }
+ if update.token.is_some() { config.token = update.token; }
+ if update.bucket.is_some() { config.bucket = update.bucket; }
+ if update.organization.is_some() { config.organization = update.organization; }
+ if update.max_body_size.is_some() { config.max_body_size = update.max_body_size; }
+ if update.verify_tls.is_some() { config.verify_tls = update.verify_tls; }
+
+ metrics.set_data(&name, "influxdb-http", &config)?;
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+const ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_READ_INFLUXDB_HTTP_SERVER)
+ .put(&API_METHOD_UPDATE_INFLUXDB_HTTP_SERVER)
+ .delete(&API_METHOD_DELETE_INFLUXDB_HTTP_SERVER);
+
+pub const ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_INFLUXDB_HTTP_SERVERS)
+ .post(&API_METHOD_CREATE_INFLUXDB_HTTP_SERVER)
+ .match_all("name", &ITEM_ROUTER);
diff --git a/src/api2/config/metricserver/influxdbudp.rs b/src/api2/config/metricserver/influxdbudp.rs
new file mode 100644
index 00000000..0936f18b
--- /dev/null
+++ b/src/api2/config/metricserver/influxdbudp.rs
@@ -0,0 +1,242 @@
+use anyhow::{bail, Error};
+use serde_json::Value;
+use serde::{Deserialize, Serialize};
+use hex::FromHex;
+
+use proxmox_router::{Router, RpcEnvironment, Permission};
+use proxmox_schema::api;
+
+use pbs_api_types::{
+ InfluxDbUdp, InfluxDbUdpUpdater,
+ PROXMOX_CONFIG_DIGEST_SCHEMA, METRIC_SERVER_ID_SCHEMA, PRIV_SYS_AUDIT, PRIV_SYS_MODIFY,
+};
+
+use pbs_config::metrics;
+
+#[api(
+ input: {
+ properties: {},
+ },
+ returns: {
+ description: "List of configured InfluxDB udp metric servers.",
+ type: Array,
+ items: { type: InfluxDbUdp },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// List configured InfluxDB udp metric servers.
+pub fn list_influxdb_udp_servers(
+ _param: Value,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<Vec<InfluxDbUdp>, Error> {
+
+ let (config, digest) = metrics::config()?;
+
+ let list = config.convert_to_typed_array("influxdb-udp")?;
+
+ rpcenv["digest"] = hex::encode(&digest).into();
+
+ Ok(list)
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ config: {
+ type: InfluxDbUdp,
+ flatten: true,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Create a new InfluxDB udp server configuration
+pub fn create_influxdb_udp_server(config: InfluxDbUdp) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, _digest) = metrics::config()?;
+
+ metrics.set_data(&config.name, "influxdb-udp", &config)?;
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Remove a InfluxDB udp server configuration
+pub fn delete_influxdb_udp_server(
+ name: String,
+ digest: Option<String>,
+ _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, expected_digest) = metrics::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = <[u8; 32]>::from_hex(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ if metrics.sections.remove(&name).is_none() {
+ bail!("name '{}' does not exist.", name);
+ }
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+#[api(
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ },
+ },
+ returns: { type: InfluxDbUdp },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_AUDIT, false),
+ },
+)]
+/// Read the InfluxDB udp server configuration
+pub fn read_influxdb_udp_server(
+ name: String,
+ mut rpcenv: &mut dyn RpcEnvironment,
+) -> Result<InfluxDbUdp, Error> {
+
+ let (metrics, digest) = metrics::config()?;
+
+ let config = metrics.lookup("influxdb-udp", &name)?;
+
+ rpcenv["digest"] = hex::encode(&digest).into();
+
+ Ok(config)
+}
+
+#[api()]
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all="kebab-case")]
+#[allow(non_camel_case_types)]
+/// Deletable property name
+pub enum DeletableProperty {
+ /// Delete the mtu property.
+ mtu,
+ /// Delete the comment property.
+ comment,
+}
+
+#[api(
+ protected: true,
+ input: {
+ properties: {
+ name: {
+ schema: METRIC_SERVER_ID_SCHEMA,
+ },
+ update: {
+ type: InfluxDbUdpUpdater,
+ flatten: true,
+ },
+ delete: {
+ description: "List of properties to delete.",
+ type: Array,
+ optional: true,
+ items: {
+ type: DeletableProperty,
+ }
+ },
+ digest: {
+ optional: true,
+ schema: PROXMOX_CONFIG_DIGEST_SCHEMA,
+ },
+ },
+ },
+ returns: { type: InfluxDbUdp },
+ access: {
+ permission: &Permission::Privilege(&[], PRIV_SYS_MODIFY, false),
+ },
+)]
+/// Update an InfluxDB udp server configuration
+pub fn update_influxdb_udp_server(
+ name: String,
+ update: InfluxDbUdpUpdater,
+ delete: Option<Vec<DeletableProperty>>,
+ digest: Option<String>,
+ _rpcenv: &mut dyn RpcEnvironment,
+) -> Result<(), Error> {
+
+ let _lock = metrics::lock_config()?;
+
+ let (mut metrics, expected_digest) = metrics::config()?;
+
+ if let Some(ref digest) = digest {
+ let digest = <[u8; 32]>::from_hex(digest)?;
+ crate::tools::detect_modified_configuration_file(&digest, &expected_digest)?;
+ }
+
+ let mut config: InfluxDbUdp = metrics.lookup("influxdb-udp", &name)?;
+
+ if let Some(delete) = delete {
+ for delete_prop in delete {
+ match delete_prop {
+ DeletableProperty::mtu => { config.mtu = None; },
+ DeletableProperty::comment => { config.comment = None; },
+ }
+ }
+ }
+
+ if let Some(comment) = update.comment {
+ let comment = comment.trim().to_string();
+ if comment.is_empty() {
+ config.comment = None;
+ } else {
+ config.comment = Some(comment);
+ }
+ }
+
+ if let Some(host) = update.host { config.host = host; }
+ if let Some(port) = update.port { config.port = port; }
+
+ if update.mtu.is_some() { config.mtu = update.mtu; }
+
+ metrics.set_data(&name, "influxdb-udp", &config)?;
+
+ metrics::save_config(&metrics)?;
+
+ Ok(())
+}
+
+const ITEM_ROUTER: Router = Router::new()
+ .get(&API_METHOD_READ_INFLUXDB_UDP_SERVER)
+ .put(&API_METHOD_UPDATE_INFLUXDB_UDP_SERVER)
+ .delete(&API_METHOD_DELETE_INFLUXDB_UDP_SERVER);
+
+pub const ROUTER: Router = Router::new()
+ .get(&API_METHOD_LIST_INFLUXDB_UDP_SERVERS)
+ .post(&API_METHOD_CREATE_INFLUXDB_UDP_SERVER)
+ .match_all("name", &ITEM_ROUTER);
diff --git a/src/api2/config/metricserver/mod.rs b/src/api2/config/metricserver/mod.rs
new file mode 100644
index 00000000..cbce34f7
--- /dev/null
+++ b/src/api2/config/metricserver/mod.rs
@@ -0,0 +1,16 @@
+use proxmox_router::{Router, SubdirMap};
+use proxmox_router::list_subdirs_api_method;
+use proxmox_sys::sortable;
+
+pub mod influxdbudp;
+pub mod influxdbhttp;
+
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+ ("influxdb-http", &influxdbhttp::ROUTER),
+ ("influxdb-udp", &influxdbudp::ROUTER),
+]);
+
+pub const ROUTER: Router = Router::new()
+ .get(&list_subdirs_api_method!(SUBDIRS))
+ .subdirs(SUBDIRS);
diff --git a/src/api2/config/mod.rs b/src/api2/config/mod.rs
index c256ba64..5de1c28f 100644
--- a/src/api2/config/mod.rs
+++ b/src/api2/config/mod.rs
@@ -12,6 +12,7 @@ pub mod verify;
pub mod drive;
pub mod changer;
pub mod media_pool;
+pub mod metricserver;
pub mod tape_encryption_keys;
pub mod tape_backup_job;
pub mod traffic_control;
@@ -23,6 +24,7 @@ const SUBDIRS: SubdirMap = &[
("datastore", &datastore::ROUTER),
("drive", &drive::ROUTER),
("media-pool", &media_pool::ROUTER),
+ ("metricserver", &metricserver::ROUTER),
("remote", &remote::ROUTER),
("sync", &sync::ROUTER),
("tape-backup-job", &tape_backup_job::ROUTER),
--
2.30.2
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints
2021-12-14 12:24 ` [pbs-devel] [PATCH proxmox-backup 6/6] api: add metricserver endpoints Dominik Csapak
@ 2021-12-15 7:39 ` Wolfgang Bumiller
0 siblings, 0 replies; 13+ messages in thread
From: Wolfgang Bumiller @ 2021-12-15 7:39 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
On Tue, Dec 14, 2021 at 01:24:12PM +0100, Dominik Csapak wrote:
> but in contrast to pve, we split the api by type of the section config,
> since we cannot handle multiple types in the updater
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> +
> +#[api()]
> +#[derive(Serialize, Deserialize)]
> +#[serde(rename_all="kebab-case")]
> +#[allow(non_camel_case_types)]
^~~~~~~~~~~~~~~~~~~^
Please don't add more of this nonsense!
> +/// Deletable property name
> +pub enum DeletableProperty {
> + /// Delete the port property.
> + port,
> + /// Delete the https property.
> + https,
> + /// Delete the token property.
> + token,
> + /// Delete the bucket property.
> + bucket,
> + /// Delete the organization property.
> + organization,
> + /// Delete the max_body_size property.
> + max_body_size,
> + /// Delete the verify_tls property.
> + verify_tls,
> + /// Delete the comment property.
> + comment,
> +}
> +#[api()]
> +#[derive(Serialize, Deserialize)]
> +#[serde(rename_all="kebab-case")]
> +#[allow(non_camel_case_types)]
No
> +/// Deletable property name
> +pub enum DeletableProperty {
> + /// Delete the mtu property.
> + mtu,
> + /// Delete the comment property.
> + comment,
> +}
> +
^ permalink raw reply [flat|nested] 13+ messages in thread