public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint
@ 2024-10-11 10:51 Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
                   ` (12 more replies)
  0 siblings, 13 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

This patch series adds a new metric endpoint at /status/metrics with the same
metric format as PVE's /cluster/metrics/export.

Patches 1 to 9 are preparatory patches, including
  - moving code around to improve the overall structure
  - style/doc comment improvments

These changes should not result in any functional changes.

Patches 10 to 12 store the results from the stat collection loop in a cache (using proxmox-shared-cache)
in /run/proxmox-backup/metrics. Same as in PVE, we cache the last 180 stat results for a total
of 30 minutes of history.

Finally, patch 13 adds the actual endpoint for fetching the metric data.

proxmox-backup:

Lukas Wagner (13):
  proxy: server: move rrd stat/metric server to separate module
  metric collection: add doc comments for public functions
  metric collection: move rrd_cache to new metric_collection module
  metric_collection: split out push metric part
  metric collection: rrd: move rrd update function to rrd module
  metric collection: rrd: restrict function visibility
  metric collection: rrd: remove rrd prefix from some function names
  metric collection: drop std::path prefix where not needed
  metric collection: move impl block for DiskStats to metric_server
    module
  pbs-api-types: add types for the new metrics endpoint
  metric collection: initialize metric cache on startup
  metric collection: put metrics in a cache
  api: add /status/metrics API

 Cargo.toml                                    |   2 +
 pbs-api-types/src/metrics.rs                  |  66 +++
 src/api2/mod.rs                               |   1 +
 src/api2/node/rrd.rs                          |   2 +-
 src/api2/status.rs                            |  15 +-
 src/bin/proxmox-backup-proxy.rs               | 419 +-----------------
 src/lib.rs                                    |   2 -
 src/server/metric_collection/metric_server.rs | 156 +++++++
 src/server/metric_collection/mod.rs           | 235 ++++++++++
 src/server/metric_collection/pull_metrics.rs  | 185 ++++++++
 .../metric_collection/rrd.rs}                 |  86 +++-
 src/server/mod.rs                             |   2 +
 12 files changed, 740 insertions(+), 431 deletions(-)
 create mode 100644 src/server/metric_collection/metric_server.rs
 create mode 100644 src/server/metric_collection/mod.rs
 create mode 100644 src/server/metric_collection/pull_metrics.rs
 rename src/{rrd_cache.rs => server/metric_collection/rrd.rs} (60%)


Summary over all repositories:
  12 files changed, 740 insertions(+), 431 deletions(-)

-- 
Generated by git-murpp 0.7.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions Lukas Wagner
                   ` (11 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

With the upcoming pull-metric system/metric caching, these
things should go into a sepearate module.

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/bin/proxmox-backup-proxy.rs     | 419 +--------------------------
 src/server/metric_collection/mod.rs | 426 ++++++++++++++++++++++++++++
 src/server/mod.rs                   |   2 +
 3 files changed, 433 insertions(+), 414 deletions(-)
 create mode 100644 src/server/metric_collection/mod.rs

diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 041f3aff..859f5b0f 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -17,10 +17,8 @@ use serde_json::{json, Value};
 
 use proxmox_lang::try_block;
 use proxmox_log::init_logger;
-use proxmox_metrics::MetricsData;
 use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
-use proxmox_sys::fs::{CreateOptions, FileSystemInformation};
-use proxmox_sys::linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat};
+use proxmox_sys::fs::CreateOptions;
 use proxmox_sys::logrotate::LogRotate;
 
 use pbs_datastore::DataStore;
@@ -30,15 +28,11 @@ use proxmox_rest_server::{
     RestEnvironment, RestServer, WorkerTask,
 };
 
-use proxmox_backup::rrd_cache::{
-    initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge,
-};
 use proxmox_backup::{
     server::{
         auth::check_pbs_auth,
         jobstate::{self, Job},
     },
-    tools::disks::BlockDevStat,
     traffic_control_cache::{SharedRateLimit, TRAFFIC_CONTROL_CACHE},
 };
 
@@ -51,11 +45,8 @@ use pbs_api_types::{
 };
 
 use proxmox_backup::auth_helpers::*;
-use proxmox_backup::server;
-use proxmox_backup::tools::{
-    disks::{zfs_dataset_stats, DiskManage},
-    PROXMOX_BACKUP_TCP_KEEPALIVE_TIME,
-};
+use proxmox_backup::server::{self, metric_collection};
+use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
 
 use proxmox_backup::api2::pull::do_sync_job;
 use proxmox_backup::api2::tape::backup::do_tape_backup_job;
@@ -186,9 +177,7 @@ async fn run() -> Result<(), Error> {
 
     proxmox_backup::auth_helpers::setup_auth_context(false);
     proxmox_backup::server::notifications::init()?;
-
-    let rrd_cache = initialize_rrd_cache()?;
-    rrd_cache.apply_journal()?;
+    metric_collection::init()?;
 
     let mut indexpath = PathBuf::from(pbs_buildcfg::JS_DIR);
     indexpath.push("index.hbs");
@@ -356,7 +345,7 @@ async fn run() -> Result<(), Error> {
     });
 
     start_task_scheduler();
-    start_stat_generator();
+    metric_collection::start_collection_task();
     start_traffic_control_updater();
 
     server.await?;
@@ -389,14 +378,6 @@ fn make_tls_acceptor() -> Result<SslAcceptor, Error> {
     acceptor.build()
 }
 
-fn start_stat_generator() {
-    tokio::spawn(async {
-        let abort_future = pin!(proxmox_daemon::shutdown_future());
-        let future = pin!(run_stat_generator());
-        futures::future::select(future, abort_future).await;
-    });
-}
-
 fn start_task_scheduler() {
     tokio::spawn(async {
         let abort_future = pin!(proxmox_daemon::shutdown_future());
@@ -867,349 +848,6 @@ async fn command_reopen_auth_logfiles() -> Result<(), Error> {
     }
 }
 
-async fn run_stat_generator() {
-    loop {
-        let delay_target = Instant::now() + Duration::from_secs(10);
-
-        let stats_future = tokio::task::spawn_blocking(|| {
-            let hoststats = collect_host_stats_sync();
-            let (hostdisk, datastores) = collect_disk_stats_sync();
-            Arc::new((hoststats, hostdisk, datastores))
-        });
-        let stats = match stats_future.await {
-            Ok(res) => res,
-            Err(err) => {
-                log::error!("collecting host stats panicked: {err}");
-                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-                continue;
-            }
-        };
-
-        let rrd_future = tokio::task::spawn_blocking({
-            let stats = Arc::clone(&stats);
-            move || {
-                rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
-                rrd_sync_journal();
-            }
-        });
-
-        let metrics_future = send_data_to_metric_servers(stats);
-
-        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
-        if let Err(err) = rrd_res {
-            log::error!("rrd update panicked: {err}");
-        }
-        if let Err(err) = metrics_res {
-            log::error!("error during metrics sending: {err}");
-        }
-
-        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-    }
-}
-
-async fn send_data_to_metric_servers(
-    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
-    let (config, _digest) = pbs_config::metrics::config()?;
-    let channel_list = get_metric_server_connections(config)?;
-
-    if channel_list.is_empty() {
-        return Ok(());
-    }
-
-    let ctime = proxmox_time::epoch_i64();
-    let nodename = proxmox_sys::nodename();
-
-    let mut values = Vec::new();
-
-    let mut cpuvalue = match &stats.0.proc {
-        Some(stat) => serde_json::to_value(stat)?,
-        None => json!({}),
-    };
-
-    if let Some(loadavg) = &stats.0.load {
-        cpuvalue["avg1"] = Value::from(loadavg.0);
-        cpuvalue["avg5"] = Value::from(loadavg.1);
-        cpuvalue["avg15"] = Value::from(loadavg.2);
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("cpustat", ctime, cpuvalue)?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    if let Some(stat) = &stats.0.meminfo {
-        values.push(Arc::new(
-            MetricsData::new("memory", ctime, stat)?
-                .tag("object", "host")
-                .tag("host", nodename),
-        ));
-    }
-
-    if let Some(netdev) = &stats.0.net {
-        for item in netdev {
-            values.push(Arc::new(
-                MetricsData::new("nics", ctime, item)?
-                    .tag("object", "host")
-                    .tag("host", nodename)
-                    .tag("instance", item.device.clone()),
-            ));
-        }
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("blockstat", ctime, stats.1.to_value())?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    for datastore in stats.2.iter() {
-        values.push(Arc::new(
-            MetricsData::new("blockstat", ctime, datastore.to_value())?
-                .tag("object", "host")
-                .tag("host", nodename)
-                .tag("datastore", datastore.name.clone()),
-        ));
-    }
-
-    // we must have a concrete functions, because the inferred lifetime from a
-    // closure is not general enough for the tokio::spawn call we are in here...
-    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
-        &item.0
-    }
-
-    let results =
-        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
-    for (res, name) in results
-        .into_iter()
-        .zip(channel_list.iter().map(|(_, name)| name))
-    {
-        if let Err(err) = res {
-            log::error!("error sending into channel of {name}: {err}");
-        }
-    }
-
-    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
-        if let Err(err) = channel.join().await {
-            log::error!("error sending to metric server {name}: {err}");
-        }
-    }))
-    .await;
-
-    Ok(())
-}
-
-/// Get the metric server connections from a config
-pub fn get_metric_server_connections(
-    metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
-    let mut res = Vec::new();
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
-        res.push((future, config.name));
-    }
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_http(
-            &config.url,
-            config.organization.as_deref().unwrap_or("proxmox"),
-            config.bucket.as_deref().unwrap_or("proxmox"),
-            config.token.as_deref(),
-            config.verify_tls.unwrap_or(true),
-            config.max_body_size.unwrap_or(25_000_000),
-        )?;
-        res.push((future, config.name));
-    }
-    Ok(res)
-}
-
-struct HostStats {
-    proc: Option<ProcFsStat>,
-    meminfo: Option<ProcFsMemInfo>,
-    net: Option<Vec<ProcFsNetDev>>,
-    load: Option<Loadavg>,
-}
-
-struct DiskStat {
-    name: String,
-    usage: Option<FileSystemInformation>,
-    dev: Option<BlockDevStat>,
-}
-
-impl DiskStat {
-    fn to_value(&self) -> Value {
-        let mut value = json!({});
-        if let Some(usage) = &self.usage {
-            value["total"] = Value::from(usage.total);
-            value["used"] = Value::from(usage.used);
-            value["avail"] = Value::from(usage.available);
-        }
-
-        if let Some(dev) = &self.dev {
-            value["read_ios"] = Value::from(dev.read_ios);
-            value["read_bytes"] = Value::from(dev.read_sectors * 512);
-            value["write_ios"] = Value::from(dev.write_ios);
-            value["write_bytes"] = Value::from(dev.write_sectors * 512);
-            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
-        }
-        value
-    }
-}
-
-fn collect_host_stats_sync() -> HostStats {
-    use proxmox_sys::linux::procfs::{
-        read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
-    };
-
-    let proc = match read_proc_stat() {
-        Ok(stat) => Some(stat),
-        Err(err) => {
-            eprintln!("read_proc_stat failed - {err}");
-            None
-        }
-    };
-
-    let meminfo = match read_meminfo() {
-        Ok(stat) => Some(stat),
-        Err(err) => {
-            eprintln!("read_meminfo failed - {err}");
-            None
-        }
-    };
-
-    let net = match read_proc_net_dev() {
-        Ok(netdev) => Some(netdev),
-        Err(err) => {
-            eprintln!("read_prox_net_dev failed - {err}");
-            None
-        }
-    };
-
-    let load = match read_loadavg() {
-        Ok(loadavg) => Some(loadavg),
-        Err(err) => {
-            eprintln!("read_loadavg failed - {err}");
-            None
-        }
-    };
-
-    HostStats {
-        proc,
-        meminfo,
-        net,
-        load,
-    }
-}
-
-fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
-    let disk_manager = DiskManage::new();
-
-    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
-
-    let mut datastores = Vec::new();
-    match pbs_config::datastore::config() {
-        Ok((config, _)) => {
-            let datastore_list: Vec<DataStoreConfig> = config
-                .convert_to_typed_array("datastore")
-                .unwrap_or_default();
-
-            for config in datastore_list {
-                if config
-                    .get_maintenance_mode()
-                    .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
-                {
-                    continue;
-                }
-                let path = std::path::Path::new(&config.path);
-                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
-            }
-        }
-        Err(err) => {
-            eprintln!("read datastore config failed - {err}");
-        }
-    }
-
-    (root, datastores)
-}
-
-fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
-    if let Some(stat) = &host.proc {
-        rrd_update_gauge("host/cpu", stat.cpu);
-        rrd_update_gauge("host/iowait", stat.iowait_percent);
-    }
-
-    if let Some(meminfo) = &host.meminfo {
-        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-        rrd_update_gauge("host/memused", meminfo.memused as f64);
-        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
-    }
-
-    if let Some(netdev) = &host.net {
-        use pbs_config::network::is_physical_nic;
-        let mut netin = 0;
-        let mut netout = 0;
-        for item in netdev {
-            if !is_physical_nic(&item.device) {
-                continue;
-            }
-            netin += item.receive;
-            netout += item.send;
-        }
-        rrd_update_derive("host/netin", netin as f64);
-        rrd_update_derive("host/netout", netout as f64);
-    }
-
-    if let Some(loadavg) = &host.load {
-        rrd_update_gauge("host/loadavg", loadavg.0);
-    }
-
-    rrd_update_disk_stat(hostdisk, "host");
-
-    for stat in datastores {
-        let rrd_prefix = format!("datastore/{}", stat.name);
-        rrd_update_disk_stat(stat, &rrd_prefix);
-    }
-}
-
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
-    if let Some(status) = &disk.usage {
-        let rrd_key = format!("{}/total", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.total as f64);
-        let rrd_key = format!("{}/used", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.used as f64);
-        let rrd_key = format!("{}/available", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.available as f64);
-    }
-
-    if let Some(stat) = &disk.dev {
-        let rrd_key = format!("{}/read_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.read_ios as f64);
-        let rrd_key = format!("{}/read_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/write_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.write_ios as f64);
-        let rrd_key = format!("{}/write_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/io_ticks", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
-    }
-}
-
 fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     let event: CalendarEvent = match event_str.parse() {
         Ok(event) => event,
@@ -1240,53 +878,6 @@ fn check_schedule(worker_type: &str, event_str: &str, id: &str) -> bool {
     next <= now
 }
 
-fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
-    let usage = match proxmox_sys::fs::fs_info(path) {
-        Ok(status) => Some(status),
-        Err(err) => {
-            eprintln!("read fs info on {path:?} failed - {err}");
-            None
-        }
-    };
-
-    let dev = match disk_manager.find_mounted_device(path) {
-        Ok(None) => None,
-        Ok(Some((fs_type, device, source))) => {
-            let mut device_stat = None;
-            match (fs_type.as_str(), source) {
-                ("zfs", Some(source)) => match source.into_string() {
-                    Ok(dataset) => match zfs_dataset_stats(&dataset) {
-                        Ok(stat) => device_stat = Some(stat),
-                        Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
-                    },
-                    Err(source) => {
-                        eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
-                    }
-                },
-                _ => {
-                    if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
-                        match disk.read_stat() {
-                            Ok(stat) => device_stat = stat,
-                            Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
-                        }
-                    }
-                }
-            }
-            device_stat
-        }
-        Err(err) => {
-            eprintln!("find_mounted_device failed - {err}");
-            None
-        }
-    };
-
-    DiskStat {
-        name: name.to_string(),
-        usage,
-        dev,
-    }
-}
-
 // Rate Limiter lookup
 async fn run_traffic_control_updater() {
     loop {
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
new file mode 100644
index 00000000..e220f51a
--- /dev/null
+++ b/src/server/metric_collection/mod.rs
@@ -0,0 +1,426 @@
+use std::{
+    path::Path,
+    pin::pin,
+    sync::Arc,
+    time::{Duration, Instant},
+};
+
+use anyhow::Error;
+use pbs_api_types::{DataStoreConfig, Operation};
+use serde_json::{json, Value};
+use tokio::join;
+
+use proxmox_metrics::MetricsData;
+use proxmox_sys::{
+    fs::FileSystemInformation,
+    linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
+};
+
+use crate::{
+    rrd_cache::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge},
+    tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
+};
+
+pub fn init() -> Result<(), Error> {
+    let rrd_cache = initialize_rrd_cache()?;
+    rrd_cache.apply_journal()?;
+    Ok(())
+}
+
+pub fn start_collection_task() {
+    tokio::spawn(async {
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        let future = pin!(run_stat_generator());
+        futures::future::select(future, abort_future).await;
+    });
+}
+
+async fn run_stat_generator() {
+    loop {
+        let delay_target = Instant::now() + Duration::from_secs(10);
+
+        let stats_future = tokio::task::spawn_blocking(|| {
+            let hoststats = collect_host_stats_sync();
+            let (hostdisk, datastores) = collect_disk_stats_sync();
+            Arc::new((hoststats, hostdisk, datastores))
+        });
+        let stats = match stats_future.await {
+            Ok(res) => res,
+            Err(err) => {
+                log::error!("collecting host stats panicked: {err}");
+                tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+                continue;
+            }
+        };
+
+        let rrd_future = tokio::task::spawn_blocking({
+            let stats = Arc::clone(&stats);
+            move || {
+                rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+                rrd_sync_journal();
+            }
+        });
+
+        let metrics_future = send_data_to_metric_servers(stats);
+
+        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+        if let Err(err) = rrd_res {
+            log::error!("rrd update panicked: {err}");
+        }
+        if let Err(err) = metrics_res {
+            log::error!("error during metrics sending: {err}");
+        }
+
+        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+    }
+}
+
+async fn send_data_to_metric_servers(
+    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+    let (config, _digest) = pbs_config::metrics::config()?;
+    let channel_list = get_metric_server_connections(config)?;
+
+    if channel_list.is_empty() {
+        return Ok(());
+    }
+
+    let ctime = proxmox_time::epoch_i64();
+    let nodename = proxmox_sys::nodename();
+
+    let mut values = Vec::new();
+
+    let mut cpuvalue = match &stats.0.proc {
+        Some(stat) => serde_json::to_value(stat)?,
+        None => json!({}),
+    };
+
+    if let Some(loadavg) = &stats.0.load {
+        cpuvalue["avg1"] = Value::from(loadavg.0);
+        cpuvalue["avg5"] = Value::from(loadavg.1);
+        cpuvalue["avg15"] = Value::from(loadavg.2);
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("cpustat", ctime, cpuvalue)?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    if let Some(stat) = &stats.0.meminfo {
+        values.push(Arc::new(
+            MetricsData::new("memory", ctime, stat)?
+                .tag("object", "host")
+                .tag("host", nodename),
+        ));
+    }
+
+    if let Some(netdev) = &stats.0.net {
+        for item in netdev {
+            values.push(Arc::new(
+                MetricsData::new("nics", ctime, item)?
+                    .tag("object", "host")
+                    .tag("host", nodename)
+                    .tag("instance", item.device.clone()),
+            ));
+        }
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("blockstat", ctime, stats.1.to_value())?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    for datastore in stats.2.iter() {
+        values.push(Arc::new(
+            MetricsData::new("blockstat", ctime, datastore.to_value())?
+                .tag("object", "host")
+                .tag("host", nodename)
+                .tag("datastore", datastore.name.clone()),
+        ));
+    }
+
+    // we must have a concrete functions, because the inferred lifetime from a
+    // closure is not general enough for the tokio::spawn call we are in here...
+    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+        &item.0
+    }
+
+    let results =
+        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+    for (res, name) in results
+        .into_iter()
+        .zip(channel_list.iter().map(|(_, name)| name))
+    {
+        if let Err(err) = res {
+            log::error!("error sending into channel of {name}: {err}");
+        }
+    }
+
+    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+        if let Err(err) = channel.join().await {
+            log::error!("error sending to metric server {name}: {err}");
+        }
+    }))
+    .await;
+
+    Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+    metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+    let mut res = Vec::new();
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+        res.push((future, config.name));
+    }
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_http(
+            &config.url,
+            config.organization.as_deref().unwrap_or("proxmox"),
+            config.bucket.as_deref().unwrap_or("proxmox"),
+            config.token.as_deref(),
+            config.verify_tls.unwrap_or(true),
+            config.max_body_size.unwrap_or(25_000_000),
+        )?;
+        res.push((future, config.name));
+    }
+    Ok(res)
+}
+
+struct HostStats {
+    proc: Option<ProcFsStat>,
+    meminfo: Option<ProcFsMemInfo>,
+    net: Option<Vec<ProcFsNetDev>>,
+    load: Option<Loadavg>,
+}
+
+struct DiskStat {
+    name: String,
+    usage: Option<FileSystemInformation>,
+    dev: Option<BlockDevStat>,
+}
+
+impl DiskStat {
+    fn to_value(&self) -> Value {
+        let mut value = json!({});
+        if let Some(usage) = &self.usage {
+            value["total"] = Value::from(usage.total);
+            value["used"] = Value::from(usage.used);
+            value["avail"] = Value::from(usage.available);
+        }
+
+        if let Some(dev) = &self.dev {
+            value["read_ios"] = Value::from(dev.read_ios);
+            value["read_bytes"] = Value::from(dev.read_sectors * 512);
+            value["write_ios"] = Value::from(dev.write_ios);
+            value["write_bytes"] = Value::from(dev.write_sectors * 512);
+            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+        }
+        value
+    }
+}
+
+fn collect_host_stats_sync() -> HostStats {
+    use proxmox_sys::linux::procfs::{
+        read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
+    };
+
+    let proc = match read_proc_stat() {
+        Ok(stat) => Some(stat),
+        Err(err) => {
+            eprintln!("read_proc_stat failed - {err}");
+            None
+        }
+    };
+
+    let meminfo = match read_meminfo() {
+        Ok(stat) => Some(stat),
+        Err(err) => {
+            eprintln!("read_meminfo failed - {err}");
+            None
+        }
+    };
+
+    let net = match read_proc_net_dev() {
+        Ok(netdev) => Some(netdev),
+        Err(err) => {
+            eprintln!("read_prox_net_dev failed - {err}");
+            None
+        }
+    };
+
+    let load = match read_loadavg() {
+        Ok(loadavg) => Some(loadavg),
+        Err(err) => {
+            eprintln!("read_loadavg failed - {err}");
+            None
+        }
+    };
+
+    HostStats {
+        proc,
+        meminfo,
+        net,
+        load,
+    }
+}
+
+fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
+    let disk_manager = DiskManage::new();
+
+    let root = gather_disk_stats(disk_manager.clone(), Path::new("/"), "host");
+
+    let mut datastores = Vec::new();
+    match pbs_config::datastore::config() {
+        Ok((config, _)) => {
+            let datastore_list: Vec<DataStoreConfig> = config
+                .convert_to_typed_array("datastore")
+                .unwrap_or_default();
+
+            for config in datastore_list {
+                if config
+                    .get_maintenance_mode()
+                    .map_or(false, |mode| mode.check(Some(Operation::Read)).is_err())
+                {
+                    continue;
+                }
+                let path = std::path::Path::new(&config.path);
+                datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
+            }
+        }
+        Err(err) => {
+            eprintln!("read datastore config failed - {err}");
+        }
+    }
+
+    (root, datastores)
+}
+
+fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
+    if let Some(stat) = &host.proc {
+        rrd_update_gauge("host/cpu", stat.cpu);
+        rrd_update_gauge("host/iowait", stat.iowait_percent);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+        rrd_update_gauge("host/memused", meminfo.memused as f64);
+        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+        use pbs_config::network::is_physical_nic;
+        let mut netin = 0;
+        let mut netout = 0;
+        for item in netdev {
+            if !is_physical_nic(&item.device) {
+                continue;
+            }
+            netin += item.receive;
+            netout += item.send;
+        }
+        rrd_update_derive("host/netin", netin as f64);
+        rrd_update_derive("host/netout", netout as f64);
+    }
+
+    if let Some(loadavg) = &host.load {
+        rrd_update_gauge("host/loadavg", loadavg.0);
+    }
+
+    rrd_update_disk_stat(hostdisk, "host");
+
+    for stat in datastores {
+        let rrd_prefix = format!("datastore/{}", stat.name);
+        rrd_update_disk_stat(stat, &rrd_prefix);
+    }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+    if let Some(status) = &disk.usage {
+        let rrd_key = format!("{}/total", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.total as f64);
+        let rrd_key = format!("{}/used", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.used as f64);
+        let rrd_key = format!("{}/available", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.available as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        let rrd_key = format!("{}/read_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        let rrd_key = format!("{}/read_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/write_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        let rrd_key = format!("{}/write_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/io_ticks", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+    }
+}
+
+fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
+    let usage = match proxmox_sys::fs::fs_info(path) {
+        Ok(status) => Some(status),
+        Err(err) => {
+            eprintln!("read fs info on {path:?} failed - {err}");
+            None
+        }
+    };
+
+    let dev = match disk_manager.find_mounted_device(path) {
+        Ok(None) => None,
+        Ok(Some((fs_type, device, source))) => {
+            let mut device_stat = None;
+            match (fs_type.as_str(), source) {
+                ("zfs", Some(source)) => match source.into_string() {
+                    Ok(dataset) => match zfs_dataset_stats(&dataset) {
+                        Ok(stat) => device_stat = Some(stat),
+                        Err(err) => eprintln!("zfs_dataset_stats({dataset:?}) failed - {err}"),
+                    },
+                    Err(source) => {
+                        eprintln!("zfs_pool_stats({source:?}) failed - invalid characters")
+                    }
+                },
+                _ => {
+                    if let Ok(disk) = disk_manager.clone().disk_by_dev_num(device.into_dev_t()) {
+                        match disk.read_stat() {
+                            Ok(stat) => device_stat = stat,
+                            Err(err) => eprintln!("disk.read_stat {path:?} failed - {err}"),
+                        }
+                    }
+                }
+            }
+            device_stat
+        }
+        Err(err) => {
+            eprintln!("find_mounted_device failed - {err}");
+            None
+        }
+    };
+
+    DiskStat {
+        name: name.to_string(),
+        usage,
+        dev,
+    }
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7f845e5b..ff92a821 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -33,6 +33,8 @@ pub use report::*;
 
 pub mod auth;
 
+pub mod metric_collection;
+
 pub(crate) mod pull;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
                   ` (10 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/mod.rs | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index e220f51a..1ca13222 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -21,12 +21,20 @@ use crate::{
     tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
 };
 
+/// Initialize the metric collection subsystem.
+///
+/// Any datapoints in the RRD journal will be committed.
 pub fn init() -> Result<(), Error> {
     let rrd_cache = initialize_rrd_cache()?;
     rrd_cache.apply_journal()?;
     Ok(())
 }
 
+/// Spawns a tokio task for regular metric collection.
+///
+/// Every 10 seconds, host and disk stats will be collected and
+///   - stored in the RRD
+///   - sent to any configured metric servers
 pub fn start_collection_task() {
     tokio::spawn(async {
         let abort_future = pin!(proxmox_daemon::shutdown_future());
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part Lukas Wagner
                   ` (9 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/api2/node/rrd.rs                                  | 2 +-
 src/api2/status.rs                                    | 2 +-
 src/lib.rs                                            | 2 --
 src/server/metric_collection/mod.rs                   | 9 +++++----
 src/{rrd_cache.rs => server/metric_collection/rrd.rs} | 0
 5 files changed, 7 insertions(+), 8 deletions(-)
 rename src/{rrd_cache.rs => server/metric_collection/rrd.rs} (100%)

diff --git a/src/api2/node/rrd.rs b/src/api2/node/rrd.rs
index 95a2816e..35260382 100644
--- a/src/api2/node/rrd.rs
+++ b/src/api2/node/rrd.rs
@@ -8,7 +8,7 @@ use proxmox_schema::api;
 
 use pbs_api_types::{NODE_SCHEMA, PRIV_SYS_AUDIT};
 
-use crate::rrd_cache::extract_rrd_data;
+use crate::server::metric_collection::rrd::extract_rrd_data;
 
 pub fn create_value_from_rrd(
     basedir: &str,
diff --git a/src/api2/status.rs b/src/api2/status.rs
index 0b0b9444..e46fc1ae 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -15,7 +15,7 @@ use pbs_api_types::{
 use pbs_config::CachedUserInfo;
 use pbs_datastore::DataStore;
 
-use crate::rrd_cache::extract_rrd_data;
+use crate::server::metric_collection::rrd::extract_rrd_data;
 use crate::tools::statistics::linear_regression;
 
 use crate::backup::can_access_any_namespace;
diff --git a/src/lib.rs b/src/lib.rs
index c89884c8..8633378c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -31,8 +31,6 @@ pub mod acme;
 
 pub mod client_helpers;
 
-pub mod rrd_cache;
-
 pub mod traffic_control_cache;
 
 /// Get the server's certificate info (from `proxy.pem`).
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 1ca13222..5102227b 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -16,10 +16,11 @@ use proxmox_sys::{
     linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
 };
 
-use crate::{
-    rrd_cache::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge},
-    tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage},
-};
+use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
+
+use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
+
+pub mod rrd;
 
 /// Initialize the metric collection subsystem.
 ///
diff --git a/src/rrd_cache.rs b/src/server/metric_collection/rrd.rs
similarity index 100%
rename from src/rrd_cache.rs
rename to src/server/metric_collection/rrd.rs
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (2 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
                   ` (8 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/metric_server.rs | 136 ++++++++++++++++++
 src/server/metric_collection/mod.rs           | 132 +----------------
 2 files changed, 138 insertions(+), 130 deletions(-)
 create mode 100644 src/server/metric_collection/metric_server.rs

diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs
new file mode 100644
index 00000000..cc9f736a
--- /dev/null
+++ b/src/server/metric_collection/metric_server.rs
@@ -0,0 +1,136 @@
+use std::sync::Arc;
+
+use anyhow::Error;
+use serde_json::{json, Value};
+
+use proxmox_metrics::MetricsData;
+
+use super::{DiskStat, HostStats};
+
+pub async fn send_data_to_metric_servers(
+    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
+) -> Result<(), Error> {
+    let (config, _digest) = pbs_config::metrics::config()?;
+    let channel_list = get_metric_server_connections(config)?;
+
+    if channel_list.is_empty() {
+        return Ok(());
+    }
+
+    let ctime = proxmox_time::epoch_i64();
+    let nodename = proxmox_sys::nodename();
+
+    let mut values = Vec::new();
+
+    let mut cpuvalue = match &stats.0.proc {
+        Some(stat) => serde_json::to_value(stat)?,
+        None => json!({}),
+    };
+
+    if let Some(loadavg) = &stats.0.load {
+        cpuvalue["avg1"] = Value::from(loadavg.0);
+        cpuvalue["avg5"] = Value::from(loadavg.1);
+        cpuvalue["avg15"] = Value::from(loadavg.2);
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("cpustat", ctime, cpuvalue)?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    if let Some(stat) = &stats.0.meminfo {
+        values.push(Arc::new(
+            MetricsData::new("memory", ctime, stat)?
+                .tag("object", "host")
+                .tag("host", nodename),
+        ));
+    }
+
+    if let Some(netdev) = &stats.0.net {
+        for item in netdev {
+            values.push(Arc::new(
+                MetricsData::new("nics", ctime, item)?
+                    .tag("object", "host")
+                    .tag("host", nodename)
+                    .tag("instance", item.device.clone()),
+            ));
+        }
+    }
+
+    values.push(Arc::new(
+        MetricsData::new("blockstat", ctime, stats.1.to_value())?
+            .tag("object", "host")
+            .tag("host", nodename),
+    ));
+
+    for datastore in stats.2.iter() {
+        values.push(Arc::new(
+            MetricsData::new("blockstat", ctime, datastore.to_value())?
+                .tag("object", "host")
+                .tag("host", nodename)
+                .tag("datastore", datastore.name.clone()),
+        ));
+    }
+
+    // we must have a concrete functions, because the inferred lifetime from a
+    // closure is not general enough for the tokio::spawn call we are in here...
+    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
+        &item.0
+    }
+
+    let results =
+        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
+    for (res, name) in results
+        .into_iter()
+        .zip(channel_list.iter().map(|(_, name)| name))
+    {
+        if let Err(err) = res {
+            log::error!("error sending into channel of {name}: {err}");
+        }
+    }
+
+    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
+        if let Err(err) = channel.join().await {
+            log::error!("error sending to metric server {name}: {err}");
+        }
+    }))
+    .await;
+
+    Ok(())
+}
+
+/// Get the metric server connections from a config
+fn get_metric_server_connections(
+    metric_config: proxmox_section_config::SectionConfigData,
+) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
+    let mut res = Vec::new();
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
+        res.push((future, config.name));
+    }
+
+    for config in
+        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
+    {
+        if !config.enable {
+            continue;
+        }
+        let future = proxmox_metrics::influxdb_http(
+            &config.url,
+            config.organization.as_deref().unwrap_or("proxmox"),
+            config.bucket.as_deref().unwrap_or("proxmox"),
+            config.token.as_deref(),
+            config.verify_tls.unwrap_or(true),
+            config.max_body_size.unwrap_or(25_000_000),
+        )?;
+        res.push((future, config.name));
+    }
+    Ok(res)
+}
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 5102227b..5a516564 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -10,7 +10,6 @@ use pbs_api_types::{DataStoreConfig, Operation};
 use serde_json::{json, Value};
 use tokio::join;
 
-use proxmox_metrics::MetricsData;
 use proxmox_sys::{
     fs::FileSystemInformation,
     linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
@@ -20,6 +19,7 @@ use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
 
 use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
 
+mod metric_server;
 pub mod rrd;
 
 /// Initialize the metric collection subsystem.
@@ -70,7 +70,7 @@ async fn run_stat_generator() {
             }
         });
 
-        let metrics_future = send_data_to_metric_servers(stats);
+        let metrics_future = metric_server::send_data_to_metric_servers(stats);
 
         let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
         if let Err(err) = rrd_res {
@@ -84,134 +84,6 @@ async fn run_stat_generator() {
     }
 }
 
-async fn send_data_to_metric_servers(
-    stats: Arc<(HostStats, DiskStat, Vec<DiskStat>)>,
-) -> Result<(), Error> {
-    let (config, _digest) = pbs_config::metrics::config()?;
-    let channel_list = get_metric_server_connections(config)?;
-
-    if channel_list.is_empty() {
-        return Ok(());
-    }
-
-    let ctime = proxmox_time::epoch_i64();
-    let nodename = proxmox_sys::nodename();
-
-    let mut values = Vec::new();
-
-    let mut cpuvalue = match &stats.0.proc {
-        Some(stat) => serde_json::to_value(stat)?,
-        None => json!({}),
-    };
-
-    if let Some(loadavg) = &stats.0.load {
-        cpuvalue["avg1"] = Value::from(loadavg.0);
-        cpuvalue["avg5"] = Value::from(loadavg.1);
-        cpuvalue["avg15"] = Value::from(loadavg.2);
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("cpustat", ctime, cpuvalue)?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    if let Some(stat) = &stats.0.meminfo {
-        values.push(Arc::new(
-            MetricsData::new("memory", ctime, stat)?
-                .tag("object", "host")
-                .tag("host", nodename),
-        ));
-    }
-
-    if let Some(netdev) = &stats.0.net {
-        for item in netdev {
-            values.push(Arc::new(
-                MetricsData::new("nics", ctime, item)?
-                    .tag("object", "host")
-                    .tag("host", nodename)
-                    .tag("instance", item.device.clone()),
-            ));
-        }
-    }
-
-    values.push(Arc::new(
-        MetricsData::new("blockstat", ctime, stats.1.to_value())?
-            .tag("object", "host")
-            .tag("host", nodename),
-    ));
-
-    for datastore in stats.2.iter() {
-        values.push(Arc::new(
-            MetricsData::new("blockstat", ctime, datastore.to_value())?
-                .tag("object", "host")
-                .tag("host", nodename)
-                .tag("datastore", datastore.name.clone()),
-        ));
-    }
-
-    // we must have a concrete functions, because the inferred lifetime from a
-    // closure is not general enough for the tokio::spawn call we are in here...
-    fn map_fn(item: &(proxmox_metrics::Metrics, String)) -> &proxmox_metrics::Metrics {
-        &item.0
-    }
-
-    let results =
-        proxmox_metrics::send_data_to_channels(&values, channel_list.iter().map(map_fn)).await;
-    for (res, name) in results
-        .into_iter()
-        .zip(channel_list.iter().map(|(_, name)| name))
-    {
-        if let Err(err) = res {
-            log::error!("error sending into channel of {name}: {err}");
-        }
-    }
-
-    futures::future::join_all(channel_list.into_iter().map(|(channel, name)| async move {
-        if let Err(err) = channel.join().await {
-            log::error!("error sending to metric server {name}: {err}");
-        }
-    }))
-    .await;
-
-    Ok(())
-}
-
-/// Get the metric server connections from a config
-fn get_metric_server_connections(
-    metric_config: proxmox_section_config::SectionConfigData,
-) -> Result<Vec<(proxmox_metrics::Metrics, String)>, Error> {
-    let mut res = Vec::new();
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbUdp>("influxdb-udp")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_udp(&config.host, config.mtu);
-        res.push((future, config.name));
-    }
-
-    for config in
-        metric_config.convert_to_typed_array::<pbs_api_types::InfluxDbHttp>("influxdb-http")?
-    {
-        if !config.enable {
-            continue;
-        }
-        let future = proxmox_metrics::influxdb_http(
-            &config.url,
-            config.organization.as_deref().unwrap_or("proxmox"),
-            config.bucket.as_deref().unwrap_or("proxmox"),
-            config.token.as_deref(),
-            config.verify_tls.unwrap_or(true),
-            config.max_body_size.unwrap_or(25_000_000),
-        )?;
-        res.push((future, config.name));
-    }
-    Ok(res)
-}
-
 struct HostStats {
     proc: Option<ProcFsStat>,
     meminfo: Option<ProcFsMemInfo>,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (3 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
                   ` (7 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/mod.rs | 70 +---------------------------
 src/server/metric_collection/rrd.rs | 72 +++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+), 68 deletions(-)

diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 5a516564..2c4c93a8 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -17,7 +17,7 @@ use proxmox_sys::{
 
 use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
 
-use rrd::{initialize_rrd_cache, rrd_sync_journal, rrd_update_derive, rrd_update_gauge};
+use rrd::{initialize_rrd_cache, rrd_sync_journal};
 
 mod metric_server;
 pub mod rrd;
@@ -65,7 +65,7 @@ async fn run_stat_generator() {
         let rrd_future = tokio::task::spawn_blocking({
             let stats = Arc::clone(&stats);
             move || {
-                rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
+                rrd::rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
                 rrd_sync_journal();
             }
         });
@@ -193,72 +193,6 @@ fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
     (root, datastores)
 }
 
-fn rrd_update_host_stats_sync(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
-    if let Some(stat) = &host.proc {
-        rrd_update_gauge("host/cpu", stat.cpu);
-        rrd_update_gauge("host/iowait", stat.iowait_percent);
-    }
-
-    if let Some(meminfo) = &host.meminfo {
-        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-        rrd_update_gauge("host/memused", meminfo.memused as f64);
-        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
-    }
-
-    if let Some(netdev) = &host.net {
-        use pbs_config::network::is_physical_nic;
-        let mut netin = 0;
-        let mut netout = 0;
-        for item in netdev {
-            if !is_physical_nic(&item.device) {
-                continue;
-            }
-            netin += item.receive;
-            netout += item.send;
-        }
-        rrd_update_derive("host/netin", netin as f64);
-        rrd_update_derive("host/netout", netout as f64);
-    }
-
-    if let Some(loadavg) = &host.load {
-        rrd_update_gauge("host/loadavg", loadavg.0);
-    }
-
-    rrd_update_disk_stat(hostdisk, "host");
-
-    for stat in datastores {
-        let rrd_prefix = format!("datastore/{}", stat.name);
-        rrd_update_disk_stat(stat, &rrd_prefix);
-    }
-}
-
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
-    if let Some(status) = &disk.usage {
-        let rrd_key = format!("{}/total", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.total as f64);
-        let rrd_key = format!("{}/used", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.used as f64);
-        let rrd_key = format!("{}/available", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.available as f64);
-    }
-
-    if let Some(stat) = &disk.dev {
-        let rrd_key = format!("{}/read_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.read_ios as f64);
-        let rrd_key = format!("{}/read_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/write_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.write_ios as f64);
-        let rrd_key = format!("{}/write_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
-
-        let rrd_key = format!("{}/io_ticks", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
-    }
-}
-
 fn gather_disk_stats(disk_manager: Arc<DiskManage>, path: &Path, name: &str) -> DiskStat {
     let usage = match proxmox_sys::fs::fs_info(path) {
         Ok(status) => Some(status),
diff --git a/src/server/metric_collection/rrd.rs b/src/server/metric_collection/rrd.rs
index 32e0b5c5..910c57f2 100644
--- a/src/server/metric_collection/rrd.rs
+++ b/src/server/metric_collection/rrd.rs
@@ -16,6 +16,8 @@ use proxmox_sys::fs::CreateOptions;
 use pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M;
 use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};
 
+use super::{DiskStat, HostStats};
+
 const RRD_CACHE_BASEDIR: &str = concat!(PROXMOX_BACKUP_STATE_DIR_M!(), "/rrdb");
 
 static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
@@ -145,3 +147,73 @@ pub fn rrd_update_derive(name: &str, value: f64) {
         }
     }
 }
+
+pub(super) fn rrd_update_host_stats_sync(
+    host: &HostStats,
+    hostdisk: &DiskStat,
+    datastores: &[DiskStat],
+) {
+    if let Some(stat) = &host.proc {
+        rrd_update_gauge("host/cpu", stat.cpu);
+        rrd_update_gauge("host/iowait", stat.iowait_percent);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
+        rrd_update_gauge("host/memused", meminfo.memused as f64);
+        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+        use pbs_config::network::is_physical_nic;
+        let mut netin = 0;
+        let mut netout = 0;
+        for item in netdev {
+            if !is_physical_nic(&item.device) {
+                continue;
+            }
+            netin += item.receive;
+            netout += item.send;
+        }
+        rrd_update_derive("host/netin", netin as f64);
+        rrd_update_derive("host/netout", netout as f64);
+    }
+
+    if let Some(loadavg) = &host.load {
+        rrd_update_gauge("host/loadavg", loadavg.0);
+    }
+
+    rrd_update_disk_stat(hostdisk, "host");
+
+    for stat in datastores {
+        let rrd_prefix = format!("datastore/{}", stat.name);
+        rrd_update_disk_stat(stat, &rrd_prefix);
+    }
+}
+
+fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+    if let Some(status) = &disk.usage {
+        let rrd_key = format!("{}/total", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.total as f64);
+        let rrd_key = format!("{}/used", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.used as f64);
+        let rrd_key = format!("{}/available", rrd_prefix);
+        rrd_update_gauge(&rrd_key, status.available as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        let rrd_key = format!("{}/read_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        let rrd_key = format!("{}/read_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/write_ios", rrd_prefix);
+        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        let rrd_key = format!("{}/write_bytes", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+
+        let rrd_key = format!("{}/io_ticks", rrd_prefix);
+        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+    }
+}
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (4 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
                   ` (6 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/rrd.rs | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/src/server/metric_collection/rrd.rs b/src/server/metric_collection/rrd.rs
index 910c57f2..77264b2c 100644
--- a/src/server/metric_collection/rrd.rs
+++ b/src/server/metric_collection/rrd.rs
@@ -23,7 +23,7 @@ const RRD_CACHE_BASEDIR: &str = concat!(PROXMOX_BACKUP_STATE_DIR_M!(), "/rrdb");
 static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
 
 /// Get the RRD cache instance
-pub fn get_rrd_cache() -> Result<&'static Cache, Error> {
+fn get_rrd_cache() -> Result<&'static Cache, Error> {
     RRD_CACHE
         .get()
         .ok_or_else(|| format_err!("RRD cache not initialized!"))
@@ -32,7 +32,7 @@ pub fn get_rrd_cache() -> Result<&'static Cache, Error> {
 /// Initialize the RRD cache instance
 ///
 /// Note: Only a single process must do this (proxmox-backup-proxy)
-pub fn initialize_rrd_cache() -> Result<&'static Cache, Error> {
+pub(super) fn initialize_rrd_cache() -> Result<&'static Cache, Error> {
     let backup_user = pbs_config::backup_user()?;
 
     let file_options = CreateOptions::new()
@@ -121,7 +121,7 @@ pub fn extract_rrd_data(
 }
 
 /// Sync/Flush the RRD journal
-pub fn rrd_sync_journal() {
+pub(super) fn rrd_sync_journal() {
     if let Ok(rrd_cache) = get_rrd_cache() {
         if let Err(err) = rrd_cache.sync_journal() {
             log::error!("rrd_sync_journal failed - {}", err);
@@ -129,7 +129,7 @@ pub fn rrd_sync_journal() {
     }
 }
 /// Update RRD Gauge values
-pub fn rrd_update_gauge(name: &str, value: f64) {
+fn rrd_update_gauge(name: &str, value: f64) {
     if let Ok(rrd_cache) = get_rrd_cache() {
         let now = proxmox_time::epoch_f64();
         if let Err(err) = rrd_cache.update_value(name, now, value, DataSourceType::Gauge) {
@@ -139,7 +139,7 @@ pub fn rrd_update_gauge(name: &str, value: f64) {
 }
 
 /// Update RRD Derive values
-pub fn rrd_update_derive(name: &str, value: f64) {
+fn rrd_update_derive(name: &str, value: f64) {
     if let Ok(rrd_cache) = get_rrd_cache() {
         let now = proxmox_time::epoch_f64();
         if let Err(err) = rrd_cache.update_value(name, now, value, DataSourceType::Derive) {
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (5 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
                   ` (5 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

We have proper namespaces, so these are a bit redundant.

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/mod.rs |  8 ++--
 src/server/metric_collection/rrd.rs | 64 ++++++++++++++---------------
 2 files changed, 33 insertions(+), 39 deletions(-)

diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 2c4c93a8..216d46ff 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -17,8 +17,6 @@ use proxmox_sys::{
 
 use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
 
-use rrd::{initialize_rrd_cache, rrd_sync_journal};
-
 mod metric_server;
 pub mod rrd;
 
@@ -26,7 +24,7 @@ pub mod rrd;
 ///
 /// Any datapoints in the RRD journal will be committed.
 pub fn init() -> Result<(), Error> {
-    let rrd_cache = initialize_rrd_cache()?;
+    let rrd_cache = rrd::init()?;
     rrd_cache.apply_journal()?;
     Ok(())
 }
@@ -65,8 +63,8 @@ async fn run_stat_generator() {
         let rrd_future = tokio::task::spawn_blocking({
             let stats = Arc::clone(&stats);
             move || {
-                rrd::rrd_update_host_stats_sync(&stats.0, &stats.1, &stats.2);
-                rrd_sync_journal();
+                rrd::update_metrics(&stats.0, &stats.1, &stats.2);
+                rrd::sync_journal();
             }
         });
 
diff --git a/src/server/metric_collection/rrd.rs b/src/server/metric_collection/rrd.rs
index 77264b2c..ed39cc94 100644
--- a/src/server/metric_collection/rrd.rs
+++ b/src/server/metric_collection/rrd.rs
@@ -23,7 +23,7 @@ const RRD_CACHE_BASEDIR: &str = concat!(PROXMOX_BACKUP_STATE_DIR_M!(), "/rrdb");
 static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
 
 /// Get the RRD cache instance
-fn get_rrd_cache() -> Result<&'static Cache, Error> {
+fn get_cache() -> Result<&'static Cache, Error> {
     RRD_CACHE
         .get()
         .ok_or_else(|| format_err!("RRD cache not initialized!"))
@@ -32,7 +32,7 @@ fn get_rrd_cache() -> Result<&'static Cache, Error> {
 /// Initialize the RRD cache instance
 ///
 /// Note: Only a single process must do this (proxmox-backup-proxy)
-pub(super) fn initialize_rrd_cache() -> Result<&'static Cache, Error> {
+pub(super) fn init() -> Result<&'static Cache, Error> {
     let backup_user = pbs_config::backup_user()?;
 
     let file_options = CreateOptions::new()
@@ -115,22 +115,22 @@ pub fn extract_rrd_data(
         RrdMode::Average => AggregationFn::Average,
     };
 
-    let rrd_cache = get_rrd_cache()?;
+    let rrd_cache = get_cache()?;
 
     rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
 }
 
 /// Sync/Flush the RRD journal
-pub(super) fn rrd_sync_journal() {
-    if let Ok(rrd_cache) = get_rrd_cache() {
+pub(super) fn sync_journal() {
+    if let Ok(rrd_cache) = get_cache() {
         if let Err(err) = rrd_cache.sync_journal() {
             log::error!("rrd_sync_journal failed - {}", err);
         }
     }
 }
 /// Update RRD Gauge values
-fn rrd_update_gauge(name: &str, value: f64) {
-    if let Ok(rrd_cache) = get_rrd_cache() {
+fn update_gauge(name: &str, value: f64) {
+    if let Ok(rrd_cache) = get_cache() {
         let now = proxmox_time::epoch_f64();
         if let Err(err) = rrd_cache.update_value(name, now, value, DataSourceType::Gauge) {
             log::error!("rrd::update_value '{}' failed - {}", name, err);
@@ -139,8 +139,8 @@ fn rrd_update_gauge(name: &str, value: f64) {
 }
 
 /// Update RRD Derive values
-fn rrd_update_derive(name: &str, value: f64) {
-    if let Ok(rrd_cache) = get_rrd_cache() {
+fn update_derive(name: &str, value: f64) {
+    if let Ok(rrd_cache) = get_cache() {
         let now = proxmox_time::epoch_f64();
         if let Err(err) = rrd_cache.update_value(name, now, value, DataSourceType::Derive) {
             log::error!("rrd::update_value '{}' failed - {}", name, err);
@@ -148,21 +148,17 @@ fn rrd_update_derive(name: &str, value: f64) {
     }
 }
 
-pub(super) fn rrd_update_host_stats_sync(
-    host: &HostStats,
-    hostdisk: &DiskStat,
-    datastores: &[DiskStat],
-) {
+pub(super) fn update_metrics(host: &HostStats, hostdisk: &DiskStat, datastores: &[DiskStat]) {
     if let Some(stat) = &host.proc {
-        rrd_update_gauge("host/cpu", stat.cpu);
-        rrd_update_gauge("host/iowait", stat.iowait_percent);
+        update_gauge("host/cpu", stat.cpu);
+        update_gauge("host/iowait", stat.iowait_percent);
     }
 
     if let Some(meminfo) = &host.meminfo {
-        rrd_update_gauge("host/memtotal", meminfo.memtotal as f64);
-        rrd_update_gauge("host/memused", meminfo.memused as f64);
-        rrd_update_gauge("host/swaptotal", meminfo.swaptotal as f64);
-        rrd_update_gauge("host/swapused", meminfo.swapused as f64);
+        update_gauge("host/memtotal", meminfo.memtotal as f64);
+        update_gauge("host/memused", meminfo.memused as f64);
+        update_gauge("host/swaptotal", meminfo.swaptotal as f64);
+        update_gauge("host/swapused", meminfo.swapused as f64);
     }
 
     if let Some(netdev) = &host.net {
@@ -176,44 +172,44 @@ pub(super) fn rrd_update_host_stats_sync(
             netin += item.receive;
             netout += item.send;
         }
-        rrd_update_derive("host/netin", netin as f64);
-        rrd_update_derive("host/netout", netout as f64);
+        update_derive("host/netin", netin as f64);
+        update_derive("host/netout", netout as f64);
     }
 
     if let Some(loadavg) = &host.load {
-        rrd_update_gauge("host/loadavg", loadavg.0);
+        update_gauge("host/loadavg", loadavg.0);
     }
 
-    rrd_update_disk_stat(hostdisk, "host");
+    update_disk_metrics(hostdisk, "host");
 
     for stat in datastores {
         let rrd_prefix = format!("datastore/{}", stat.name);
-        rrd_update_disk_stat(stat, &rrd_prefix);
+        update_disk_metrics(stat, &rrd_prefix);
     }
 }
 
-fn rrd_update_disk_stat(disk: &DiskStat, rrd_prefix: &str) {
+fn update_disk_metrics(disk: &DiskStat, rrd_prefix: &str) {
     if let Some(status) = &disk.usage {
         let rrd_key = format!("{}/total", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.total as f64);
+        update_gauge(&rrd_key, status.total as f64);
         let rrd_key = format!("{}/used", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.used as f64);
+        update_gauge(&rrd_key, status.used as f64);
         let rrd_key = format!("{}/available", rrd_prefix);
-        rrd_update_gauge(&rrd_key, status.available as f64);
+        update_gauge(&rrd_key, status.available as f64);
     }
 
     if let Some(stat) = &disk.dev {
         let rrd_key = format!("{}/read_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.read_ios as f64);
+        update_derive(&rrd_key, stat.read_ios as f64);
         let rrd_key = format!("{}/read_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
+        update_derive(&rrd_key, (stat.read_sectors * 512) as f64);
 
         let rrd_key = format!("{}/write_ios", rrd_prefix);
-        rrd_update_derive(&rrd_key, stat.write_ios as f64);
+        update_derive(&rrd_key, stat.write_ios as f64);
         let rrd_key = format!("{}/write_bytes", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
+        update_derive(&rrd_key, (stat.write_sectors * 512) as f64);
 
         let rrd_key = format!("{}/io_ticks", rrd_prefix);
-        rrd_update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
+        update_derive(&rrd_key, (stat.io_ticks as f64) / 1000.0);
     }
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (6 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
                   ` (4 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/mod.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 216d46ff..dd5bca70 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -179,7 +179,7 @@ fn collect_disk_stats_sync() -> (DiskStat, Vec<DiskStat>) {
                 {
                     continue;
                 }
-                let path = std::path::Path::new(&config.path);
+                let path = Path::new(&config.path);
                 datastores.push(gather_disk_stats(disk_manager.clone(), path, &config.name));
             }
         }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (7 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
                   ` (3 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

It is only needed there and could be considered an implementation detail
of how this module works.

No functional changes intended.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/metric_server.rs | 20 ++++++++++++++++++
 src/server/metric_collection/mod.rs           | 21 -------------------
 2 files changed, 20 insertions(+), 21 deletions(-)

diff --git a/src/server/metric_collection/metric_server.rs b/src/server/metric_collection/metric_server.rs
index cc9f736a..ba20628a 100644
--- a/src/server/metric_collection/metric_server.rs
+++ b/src/server/metric_collection/metric_server.rs
@@ -134,3 +134,23 @@ fn get_metric_server_connections(
     }
     Ok(res)
 }
+
+impl DiskStat {
+    fn to_value(&self) -> Value {
+        let mut value = json!({});
+        if let Some(usage) = &self.usage {
+            value["total"] = Value::from(usage.total);
+            value["used"] = Value::from(usage.used);
+            value["avail"] = Value::from(usage.available);
+        }
+
+        if let Some(dev) = &self.dev {
+            value["read_ios"] = Value::from(dev.read_ios);
+            value["read_bytes"] = Value::from(dev.read_sectors * 512);
+            value["write_ios"] = Value::from(dev.write_ios);
+            value["write_bytes"] = Value::from(dev.write_sectors * 512);
+            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
+        }
+        value
+    }
+}
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index dd5bca70..8278e001 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -7,7 +7,6 @@ use std::{
 
 use anyhow::Error;
 use pbs_api_types::{DataStoreConfig, Operation};
-use serde_json::{json, Value};
 use tokio::join;
 
 use proxmox_sys::{
@@ -95,26 +94,6 @@ struct DiskStat {
     dev: Option<BlockDevStat>,
 }
 
-impl DiskStat {
-    fn to_value(&self) -> Value {
-        let mut value = json!({});
-        if let Some(usage) = &self.usage {
-            value["total"] = Value::from(usage.total);
-            value["used"] = Value::from(usage.used);
-            value["avail"] = Value::from(usage.available);
-        }
-
-        if let Some(dev) = &self.dev {
-            value["read_ios"] = Value::from(dev.read_ios);
-            value["read_bytes"] = Value::from(dev.read_sectors * 512);
-            value["write_ios"] = Value::from(dev.write_ios);
-            value["write_bytes"] = Value::from(dev.write_sectors * 512);
-            value["io_ticks"] = Value::from(dev.io_ticks / 1000);
-        }
-        value
-    }
-}
-
 fn collect_host_stats_sync() -> HostStats {
     use proxmox_sys::linux::procfs::{
         read_loadavg, read_meminfo, read_proc_net_dev, read_proc_stat,
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (8 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup Lukas Wagner
                   ` (2 subsequent siblings)
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 pbs-api-types/src/metrics.rs | 66 ++++++++++++++++++++++++++++++++++++
 1 file changed, 66 insertions(+)

diff --git a/pbs-api-types/src/metrics.rs b/pbs-api-types/src/metrics.rs
index 23421035..26266529 100644
--- a/pbs-api-types/src/metrics.rs
+++ b/pbs-api-types/src/metrics.rs
@@ -187,3 +187,69 @@ pub struct MetricServerInfo {
     #[serde(skip_serializing_if = "Option::is_none")]
     pub comment: Option<String>,
 }
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+#[api(
+    properties: {
+        data: {
+            type: Array,
+            items: {
+                type: MetricDataPoint,
+            }
+        }
+    }
+)]
+/// Return type for the metric API endpoint
+pub struct Metrics {
+    /// List of metric data points, sorted by timestamp
+    pub data: Vec<MetricDataPoint>,
+}
+
+#[api(
+    properties: {
+        id: {
+            type: String,
+        },
+        metric: {
+            type: String,
+        },
+        timestamp: {
+            type: Integer,
+        },
+    },
+)]
+/// Metric data point
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct MetricDataPoint {
+    /// Unique identifier for this metric object, for instance 'node/<nodename>'
+    /// or 'qemu/<vmid>'.
+    pub id: String,
+
+    /// Name of the metric.
+    pub metric: String,
+
+    /// Time at which this metric was observed
+    pub timestamp: i64,
+
+    #[serde(rename = "type")]
+    pub ty: MetricDataType,
+
+    /// Metric value.
+    pub value: f64,
+}
+
+#[api]
+/// Type of the metric.
+#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
+#[serde(rename_all = "lowercase")]
+pub enum MetricDataType {
+    /// gauge.
+    Gauge,
+    /// counter.
+    Counter,
+    /// derive.
+    Derive,
+}
+
+serde_plain::derive_display_from_serialize!(MetricDataType);
+serde_plain::derive_fromstr_from_deserialize!(MetricDataType);
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (9 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API Lukas Wagner
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 Cargo.toml                                   |  2 ++
 src/server/metric_collection/mod.rs          | 10 ++++--
 src/server/metric_collection/pull_metrics.rs | 35 ++++++++++++++++++++
 3 files changed, 45 insertions(+), 2 deletions(-)
 create mode 100644 src/server/metric_collection/pull_metrics.rs

diff --git a/Cargo.toml b/Cargo.toml
index 2536550c..c531607f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -81,6 +81,7 @@ proxmox-rrd-api-types = "1.0.2"
 proxmox-schema = "3"
 proxmox-section-config = "2"
 proxmox-serde = "0.1.1"
+proxmox-shared-cache = "0.1"
 proxmox-shared-memory = "0.3.0"
 proxmox-sortable-macro = "0.1.2"
 proxmox-subscription = { version = "0.4.2", features = [ "api-types" ] }
@@ -225,6 +226,7 @@ proxmox-router = { workspace = true, features = [ "cli", "server"] }
 proxmox-schema = { workspace = true, features = [ "api-macro" ] }
 proxmox-section-config.workspace = true
 proxmox-serde = { workspace = true, features = [ "serde_json" ] }
+proxmox-shared-cache.workspace = true
 proxmox-shared-memory.workspace = true
 proxmox-sortable-macro.workspace = true
 proxmox-subscription.workspace = true
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 8278e001..3be73c22 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -6,9 +6,9 @@ use std::{
 };
 
 use anyhow::Error;
-use pbs_api_types::{DataStoreConfig, Operation};
 use tokio::join;
 
+use pbs_api_types::{DataStoreConfig, Operation};
 use proxmox_sys::{
     fs::FileSystemInformation,
     linux::procfs::{Loadavg, ProcFsMemInfo, ProcFsNetDev, ProcFsStat},
@@ -17,14 +17,20 @@ use proxmox_sys::{
 use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
 
 mod metric_server;
+mod pull_metrics;
 pub mod rrd;
 
+const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(10);
+
 /// Initialize the metric collection subsystem.
 ///
 /// Any datapoints in the RRD journal will be committed.
 pub fn init() -> Result<(), Error> {
     let rrd_cache = rrd::init()?;
     rrd_cache.apply_journal()?;
+
+    pull_metrics::init()?;
+
     Ok(())
 }
 
@@ -43,7 +49,7 @@ pub fn start_collection_task() {
 
 async fn run_stat_generator() {
     loop {
-        let delay_target = Instant::now() + Duration::from_secs(10);
+        let delay_target = Instant::now() + METRIC_COLLECTION_INTERVAL;
 
         let stats_future = tokio::task::spawn_blocking(|| {
             let hoststats = collect_host_stats_sync();
diff --git a/src/server/metric_collection/pull_metrics.rs b/src/server/metric_collection/pull_metrics.rs
new file mode 100644
index 00000000..707cb27c
--- /dev/null
+++ b/src/server/metric_collection/pull_metrics.rs
@@ -0,0 +1,35 @@
+use std::{path::Path, sync::OnceLock, time::Duration};
+
+use anyhow::{format_err, Error};
+
+use nix::sys::stat::Mode;
+use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR;
+use proxmox_shared_cache::SharedCache;
+use proxmox_sys::fs::CreateOptions;
+
+use super::METRIC_COLLECTION_INTERVAL;
+
+const METRIC_CACHE_TIME: Duration = Duration::from_secs(30 * 60);
+const STORED_METRIC_GENERATIONS: u64 =
+    METRIC_CACHE_TIME.as_secs() / METRIC_COLLECTION_INTERVAL.as_secs();
+
+static METRIC_CACHE: OnceLock<SharedCache> = OnceLock::new();
+
+/// Initialize the metric cache.
+pub(super) fn init() -> Result<(), Error> {
+    let backup_user = pbs_config::backup_user()?;
+    let file_opts = CreateOptions::new()
+        .owner(backup_user.uid)
+        .group(backup_user.gid)
+        .perm(Mode::from_bits_truncate(0o660));
+
+    let cache_location = Path::new(PROXMOX_BACKUP_RUN_DIR).join("metrics");
+
+    let cache = SharedCache::new(cache_location, file_opts, STORED_METRIC_GENERATIONS as u32)?;
+
+    METRIC_CACHE
+        .set(cache)
+        .map_err(|_e| format_err!("metric cache already initialized"))?;
+
+    Ok(())
+}
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (10 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API Lukas Wagner
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

Any pull-metric API endpoint can alter access the cache to
retrieve metric data for a limited time (30mins).

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/server/metric_collection/mod.rs          |  13 ++-
 src/server/metric_collection/pull_metrics.rs | 107 ++++++++++++++++++-
 2 files changed, 118 insertions(+), 2 deletions(-)

diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index 3be73c22..e6e04c5b 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -72,16 +72,27 @@ async fn run_stat_generator() {
                 rrd::sync_journal();
             }
         });
+        let pull_metric_future = tokio::task::spawn_blocking({
+            let stats = Arc::clone(&stats);
+            move || {
+                pull_metrics::update_metrics(&stats.0, &stats.1, &stats.2)?;
+                Ok::<(), Error>(())
+            }
+        });
 
         let metrics_future = metric_server::send_data_to_metric_servers(stats);
 
-        let (rrd_res, metrics_res) = join!(rrd_future, metrics_future);
+        let (rrd_res, metrics_res, pull_metrics_res) =
+            join!(rrd_future, metrics_future, pull_metric_future);
         if let Err(err) = rrd_res {
             log::error!("rrd update panicked: {err}");
         }
         if let Err(err) = metrics_res {
             log::error!("error during metrics sending: {err}");
         }
+        if let Err(err) = pull_metrics_res {
+            log::error!("error caching pull-style metrics: {err}");
+        }
 
         tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
     }
diff --git a/src/server/metric_collection/pull_metrics.rs b/src/server/metric_collection/pull_metrics.rs
index 707cb27c..f4b506cf 100644
--- a/src/server/metric_collection/pull_metrics.rs
+++ b/src/server/metric_collection/pull_metrics.rs
@@ -3,11 +3,16 @@ use std::{path::Path, sync::OnceLock, time::Duration};
 use anyhow::{format_err, Error};
 
 use nix::sys::stat::Mode;
+use pbs_api_types::{
+    MetricDataPoint,
+    MetricDataType::{self, Derive, Gauge},
+};
 use pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR;
 use proxmox_shared_cache::SharedCache;
 use proxmox_sys::fs::CreateOptions;
+use serde::{Deserialize, Serialize};
 
-use super::METRIC_COLLECTION_INTERVAL;
+use super::{DiskStat, HostStats, METRIC_COLLECTION_INTERVAL};
 
 const METRIC_CACHE_TIME: Duration = Duration::from_secs(30 * 60);
 const STORED_METRIC_GENERATIONS: u64 =
@@ -33,3 +38,103 @@ pub(super) fn init() -> Result<(), Error> {
 
     Ok(())
 }
+
+/// Convert `DiskStat` `HostStat` into a universal metric data point and cache
+/// them for a later retrieval.
+pub(super) fn update_metrics(
+    host: &HostStats,
+    hostdisk: &DiskStat,
+    datastores: &[DiskStat],
+) -> Result<(), Error> {
+    let mut points = MetricDataPoints::new(proxmox_time::epoch_i64());
+
+    // Using the same metric names as in PVE's new /cluster/metrics/export endpoint
+    if let Some(stat) = &host.proc {
+        points.add(Gauge, "host", "cpu_current", stat.cpu);
+        points.add(Gauge, "host", "cpu_iowait", stat.iowait_percent);
+    }
+
+    if let Some(loadavg) = &host.load {
+        points.add(Gauge, "host", "cpu_avg1", loadavg.0);
+        points.add(Gauge, "host", "cpu_avg5", loadavg.1);
+        points.add(Gauge, "host", "cpu_avg15", loadavg.2);
+    }
+
+    if let Some(meminfo) = &host.meminfo {
+        points.add(Gauge, "host", "mem_total", meminfo.memtotal as f64);
+        points.add(Gauge, "host", "mem_used", meminfo.memused as f64);
+        points.add(Gauge, "host", "swap_total", meminfo.swaptotal as f64);
+        points.add(Gauge, "host", "swap_used", meminfo.swapused as f64);
+    }
+
+    if let Some(netdev) = &host.net {
+        use pbs_config::network::is_physical_nic;
+        let mut netin = 0;
+        let mut netout = 0;
+        for item in netdev {
+            if !is_physical_nic(&item.device) {
+                continue;
+            }
+            netin += item.receive;
+            netout += item.send;
+        }
+        points.add(Derive, "host", "net_in", netin as f64);
+        points.add(Derive, "host", "net_out", netout as f64);
+    }
+
+    update_disk_metrics(&mut points, hostdisk, "host");
+
+    for stat in datastores {
+        let id = format!("datastore/{}", stat.name);
+        update_disk_metrics(&mut points, stat, &id);
+    }
+
+    get_cache()?.set(&points, Duration::from_secs(2))?;
+
+    Ok(())
+}
+
+fn get_cache() -> Result<&'static SharedCache, Error> {
+    // Not using get_or_init here since initialization can fail.
+    METRIC_CACHE
+        .get()
+        .ok_or_else(|| format_err!("metric cache not initialized"))
+}
+
+fn update_disk_metrics(points: &mut MetricDataPoints, disk: &DiskStat, id: &str) {
+    if let Some(status) = &disk.usage {
+        points.add(Gauge, id, "disk_total", status.total as f64);
+        points.add(Gauge, id, "disk_used", status.used as f64);
+        points.add(Gauge, id, "disk_available", status.available as f64);
+    }
+
+    if let Some(stat) = &disk.dev {
+        points.add(Derive, id, "disk_read", (stat.read_sectors * 512) as f64);
+        points.add(Derive, id, "disk_write", (stat.write_sectors * 512) as f64);
+    }
+}
+
+#[derive(Serialize, Deserialize)]
+struct MetricDataPoints {
+    timestamp: i64,
+    datapoints: Vec<MetricDataPoint>,
+}
+
+impl MetricDataPoints {
+    fn new(timestamp: i64) -> Self {
+        Self {
+            datapoints: Vec::new(),
+            timestamp,
+        }
+    }
+
+    fn add(&mut self, ty: MetricDataType, id: &str, metric: &str, value: f64) {
+        self.datapoints.push(MetricDataPoint {
+            id: id.into(),
+            metric: metric.into(),
+            timestamp: self.timestamp,
+            ty,
+            value,
+        })
+    }
+}
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API
  2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
                   ` (11 preceding siblings ...)
  2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache Lukas Wagner
@ 2024-10-11 10:51 ` Lukas Wagner
  12 siblings, 0 replies; 14+ messages in thread
From: Lukas Wagner @ 2024-10-11 10:51 UTC (permalink / raw)
  To: pbs-devel

This one is modelled exactly as the one in PVE (there it
is available under /cluster/metrics/export).

The returned data format is quite simple, being an array of
metric records, including a value, a metric name, an id to identify
the object (e.g. datastore/foo, host), a timestamp and a type
('gauge', 'derive', ...). The latter property makes the format
self-describing and aids the metric collector in choosing a
representation for storing the metric data.

[
    ...
    {
	"metric": "cpu_avg1",
	"value": 0.12,
	"timestamp": 170053205,
	"id": "host",
	"type": "gauge"
    },
    ...
]

In terms of permissions, the new endpoint requires Sys.Audit
on /system/status for metrics of the 'host' object,
and Datastore.Audit on /datastore/{store} for 'datastore/{store}'
metric objects.

Via the 'history' and 'start-time' parameters one can query
the last 30mins of metric history. If these parameters
are not provided, only the most recent metric generation
is returned.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 src/api2/mod.rs                              |  1 +
 src/api2/status.rs                           | 13 ++++--
 src/server/metric_collection/mod.rs          |  4 +-
 src/server/metric_collection/pull_metrics.rs | 45 ++++++++++++++++++++
 4 files changed, 57 insertions(+), 6 deletions(-)

diff --git a/src/api2/mod.rs b/src/api2/mod.rs
index a83e4c20..f485ae53 100644
--- a/src/api2/mod.rs
+++ b/src/api2/mod.rs
@@ -9,6 +9,7 @@ pub mod admin;
 pub mod backup;
 pub mod config;
 pub mod helpers;
+pub mod metrics;
 pub mod node;
 pub mod ping;
 pub mod pull;
diff --git a/src/api2/status.rs b/src/api2/status.rs
index e46fc1ae..f217a31d 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -7,6 +7,7 @@ use proxmox_router::list_subdirs_api_method;
 use proxmox_router::{ApiMethod, Permission, Router, RpcEnvironment, SubdirMap};
 use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};
 use proxmox_schema::api;
+use proxmox_sortable_macro::sortable;
 
 use pbs_api_types::{
     Authid, DataStoreStatusListItem, Operation, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
@@ -137,10 +138,14 @@ pub async fn datastore_status(
     Ok(list)
 }
 
-const SUBDIRS: SubdirMap = &[(
-    "datastore-usage",
-    &Router::new().get(&API_METHOD_DATASTORE_STATUS),
-)];
+#[sortable]
+const SUBDIRS: SubdirMap = &sorted!([
+    (
+        "datastore-usage",
+        &Router::new().get(&API_METHOD_DATASTORE_STATUS),
+    ),
+    ("metrics", &super::metrics::ROUTER),
+]);
 
 pub const ROUTER: Router = Router::new()
     .get(&list_subdirs_api_method!(SUBDIRS))
diff --git a/src/server/metric_collection/mod.rs b/src/server/metric_collection/mod.rs
index e6e04c5b..3cbd7425 100644
--- a/src/server/metric_collection/mod.rs
+++ b/src/server/metric_collection/mod.rs
@@ -17,8 +17,8 @@ use proxmox_sys::{
 use crate::tools::disks::{zfs_dataset_stats, BlockDevStat, DiskManage};
 
 mod metric_server;
-mod pull_metrics;
-pub mod rrd;
+pub(crate) mod pull_metrics;
+pub(crate) mod rrd;
 
 const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(10);
 
diff --git a/src/server/metric_collection/pull_metrics.rs b/src/server/metric_collection/pull_metrics.rs
index f4b506cf..1b5f3777 100644
--- a/src/server/metric_collection/pull_metrics.rs
+++ b/src/server/metric_collection/pull_metrics.rs
@@ -39,6 +39,51 @@ pub(super) fn init() -> Result<(), Error> {
     Ok(())
 }
 
+/// Return most recent metrics
+///
+/// If the metric collection loop has no produced any metrics yet, an empty
+/// `Vec` is returned. Returns an error if the cache could not be accessed.
+pub fn get_most_recent_metrics() -> Result<Vec<MetricDataPoint>, Error> {
+    let cached_datapoints: Option<MetricDataPoints> = get_cache()?.get()?;
+    let mut points = cached_datapoints.map(|r| r.datapoints).unwrap_or_default();
+
+    points.sort_unstable_by_key(|p| p.timestamp);
+
+    Ok(points)
+}
+
+/// Return all cached metrics with a `timestamp > start_time`
+///
+/// If the metric collection loop has no produced any metrics yet, an empty
+/// `Vec` is returned. Returns an error if the cache could not be accessed.
+pub fn get_all_metrics(start_time: i64) -> Result<Vec<MetricDataPoint>, Error> {
+    let now = proxmox_time::epoch_i64();
+
+    let delta = now - start_time;
+
+    if delta < 0 {
+        // start-time in the future, no metrics for you
+        return Ok(Vec::new());
+    }
+
+    let generations = delta / (METRIC_COLLECTION_INTERVAL.as_secs() as i64);
+    let generations = generations.clamp(0, STORED_METRIC_GENERATIONS as i64);
+
+    let cached_datapoints: Vec<MetricDataPoints> = get_cache()?.get_last(generations as u32)?;
+
+    let mut points = Vec::new();
+
+    for gen in cached_datapoints {
+        if gen.timestamp > start_time {
+            points.extend(gen.datapoints);
+        }
+    }
+
+    points.sort_unstable_by_key(|p| p.timestamp);
+
+    Ok(points)
+}
+
 /// Convert `DiskStat` `HostStat` into a universal metric data point and cache
 /// them for a later retrieval.
 pub(super) fn update_metrics(
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 14+ messages in thread

end of thread, other threads:[~2024-10-11 10:51 UTC | newest]

Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-10-11 10:51 [pbs-devel] [PATCH proxmox-backup 00/13] add metric endpoint Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 01/13] proxy: server: move rrd stat/metric server to separate module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 02/13] metric collection: add doc comments for public functions Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 03/13] metric collection: move rrd_cache to new metric_collection module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 04/13] metric_collection: split out push metric part Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 05/13] metric collection: rrd: move rrd update function to rrd module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 06/13] metric collection: rrd: restrict function visibility Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 07/13] metric collection: rrd: remove rrd prefix from some function names Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 08/13] metric collection: drop std::path prefix where not needed Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 09/13] metric collection: move impl block for DiskStats to metric_server module Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 10/13] pbs-api-types: add types for the new metrics endpoint Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 11/13] metric collection: initialize metric cache on startup Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 12/13] metric collection: put metrics in a cache Lukas Wagner
2024-10-11 10:51 ` [pbs-devel] [PATCH proxmox-backup 13/13] api: add /status/metrics API Lukas Wagner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal