From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id D67121FF13A for ; Wed, 15 Apr 2026 10:47:32 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B6C1091DD; Wed, 15 Apr 2026 10:47:32 +0200 (CEST) Mime-Version: 1.0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=UTF-8 Date: Wed, 15 Apr 2026 10:47:25 +0200 Message-Id: Subject: Re: [PATCH datacenter-manager v3 06/11] metric collection: collect PDM host metrics in a new collection task To: "Lukas Wagner" , X-Mailer: aerc 0.20.0 References: <20260413085816.143591-1-l.wagner@proxmox.com> <20260413085816.143591-7-l.wagner@proxmox.com> In-Reply-To: <20260413085816.143591-7-l.wagner@proxmox.com> From: "Shannon Sterz" X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776242768228 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.122 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs,disk.total] Message-ID-Hash: S7OA62I3SXQPGHDUD3KIUUX6FFRFTKTH X-Message-ID-Hash: S7OA62I3SXQPGHDUD3KIUUX6FFRFTKTH X-MailFrom: s.sterz@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On Mon Apr 13, 2026 at 10:58 AM CEST, Lukas Wagner wrote: > Then whole architecture is pretty similar to the remote metric nit: The instead of Then :) > collection. We introduce a task that fetches host metrics and sends them > via a channel to the RRD task, which is responsible for persisting them > in the RRD database. > > Signed-off-by: Lukas Wagner > Reviewed-by: Arthur Bied-Charreton > Tested-by: Arthur Bied-Charreton > --- > Cargo.toml | 2 + > debian/control | 2 + > server/Cargo.toml | 2 + > .../local_collection_task.rs | 199 ++++++++++++++++++ > server/src/metric_collection/mod.rs | 21 +- > server/src/metric_collection/rrd_task.rs | 185 ++++++++++++++++ > 6 files changed, 406 insertions(+), 5 deletions(-) > create mode 100644 server/src/metric_collection/local_collection_task.rs > > diff --git a/Cargo.toml b/Cargo.toml > index ec2aa3dc..b708ee98 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -39,6 +39,7 @@ proxmox-auth-api =3D "1.0.5" > proxmox-base64 =3D "1" > proxmox-client =3D "1" > proxmox-daemon =3D "1" > +proxmox-disks =3D "0.2" > proxmox-docgen =3D "1" > proxmox-http =3D { version =3D "1.0.4", features =3D [ "client", "http-h= elpers", "websocket" ] } # see below > proxmox-human-byte =3D "1" > @@ -47,6 +48,7 @@ proxmox-ldap =3D { version =3D "1.1", features =3D ["sy= nc"] } > proxmox-lang =3D "1.1" > proxmox-log =3D "1" > proxmox-login =3D "1.0.2" > +proxmox-procfs =3D "0.1" > proxmox-rest-server =3D "1" > # some use "cli", some use "cli" and "server", pbs-config uses nothing > proxmox-router =3D { version =3D "3.0.0", default-features =3D false } > diff --git a/debian/control b/debian/control > index 4ddc9efc..9101e8cd 100644 > --- a/debian/control > +++ b/debian/control > @@ -52,6 +52,7 @@ Build-Depends: debhelper-compat (=3D 13), > librust-proxmox-config-digest-1+default-dev, > librust-proxmox-config-digest-1+openssl-dev, > librust-proxmox-daemon-1+default-dev, > + librust-proxmox-disks-0.1+default-dev, > librust-proxmox-dns-api-1+default-dev, > librust-proxmox-dns-api-1+impl-dev, > librust-proxmox-docgen-1+default-dev, > @@ -72,6 +73,7 @@ Build-Depends: debhelper-compat (=3D 13), > librust-proxmox-network-api-1+impl-dev, > librust-proxmox-node-status-1+api-dev, > librust-proxmox-openid-1+default-dev (>=3D 1.0.2-~~), > + librust-proxmox-procfs-0.1+default-dev, > librust-proxmox-product-config-1+default-dev, > librust-proxmox-rest-server-1+default-dev, > librust-proxmox-rest-server-1+templates-dev, > diff --git a/server/Cargo.toml b/server/Cargo.toml > index 6969549f..65170864 100644 > --- a/server/Cargo.toml > +++ b/server/Cargo.toml > @@ -40,6 +40,7 @@ proxmox-async.workspace =3D true > proxmox-auth-api =3D { workspace =3D true, features =3D [ "api", "ticket= ", "pam-authenticator", "password-authenticator" ] } > proxmox-base64.workspace =3D true > proxmox-daemon.workspace =3D true > +proxmox-disks.workspace =3D true > proxmox-docgen.workspace =3D true > proxmox-http =3D { workspace =3D true, features =3D [ "client-trait", "p= roxmox-async" ] } # pbs-client doesn't use these > proxmox-lang.workspace =3D true > @@ -47,6 +48,7 @@ proxmox-ldap.workspace =3D true > proxmox-log.workspace =3D true > proxmox-login.workspace =3D true > proxmox-openid.workspace =3D true > +proxmox-procfs.workspace =3D true > proxmox-rest-server =3D { workspace =3D true, features =3D [ "templates"= ] } > proxmox-router =3D { workspace =3D true, features =3D [ "cli", "server"]= } > proxmox-rrd.workspace =3D true > diff --git a/server/src/metric_collection/local_collection_task.rs b/serv= er/src/metric_collection/local_collection_task.rs > new file mode 100644 > index 00000000..034b51a3 > --- /dev/null > +++ b/server/src/metric_collection/local_collection_task.rs > @@ -0,0 +1,199 @@ > +use std::sync::Mutex; > +use std::time::Instant; > +use std::{collections::HashMap, time::Duration}; > + > +use anyhow::{Context, Error}; > +use tokio::{sync::mpsc::Sender, time::MissedTickBehavior}; > + > +use proxmox_disks::Disks; > +use proxmox_log::{debug, error}; > +use proxmox_network_api::IpLink; > +use proxmox_procfs::pressure::{PressureData, Resource}; > +use proxmox_sys::fs; > +use proxmox_sys::linux::procfs; > + > +use super::rrd_task::RrdStoreRequest; > + > +const HOST_METRIC_COLLECTION_INTERVAL: Duration =3D Duration::from_secs(= 10); > + > +/// Task which periodically collects metrics from the PDM host and store= s > +/// them in the local metrics database. > +pub(super) struct LocalMetricCollectionTask { > + metric_data_tx: Sender, > +} > + > +impl LocalMetricCollectionTask { > + /// Create a new metric collection task. > + pub(super) fn new(metric_data_tx: Sender) -> Self { > + Self { metric_data_tx } > + } > + > + /// Run the metric collection task. > + /// > + /// This function never returns. > + pub(super) async fn run(&mut self) { > + let mut timer =3D tokio::time::interval(HOST_METRIC_COLLECTION_I= NTERVAL); > + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); > + > + loop { > + timer.tick().await; > + self.handle_tick().await; > + } > + } > + > + /// Handle a timer tick. > + async fn handle_tick(&mut self) { > + let stats =3D match tokio::task::spawn_blocking(collect_host_met= rics).await { > + Ok(stats) =3D> stats, > + Err(err) =3D> { > + error!("join error while collecting host stats: {err}"); > + return; > + } > + }; > + > + let _ =3D self > + .metric_data_tx > + .send(RrdStoreRequest::Host { > + timestamp: proxmox_time::epoch_i64(), > + metrics: Box::new(stats), > + }) > + .await; > + } > +} > + > +/// Container type for various metrics of a PDM host. > +pub(super) struct PdmHostMetrics { > + /// CPU statistics from `/proc/stat`. > + pub proc: Option, > + /// Memory statistics from `/proc/meminfo`. > + pub meminfo: Option, > + /// System load stats from `/proc/loadavg`. > + pub load: Option, > + /// Aggregated network device traffic for all physical NICs. > + pub netstats: Option, > + /// Block device stats for the root disk. > + pub root_blockdev_stat: Option, > + /// File system usage for the root disk. > + pub root_filesystem_info: Option, > + /// CPU pressure stall information for the host. > + pub cpu_pressure: Option, > + /// CPU pressure stall information for the host. > + pub memory_pressure: Option, > + /// IO pressure stall information for the host. > + pub io_pressure: Option, > +} > + > +/// Aggregated network device traffic for all physical NICs. > +pub(super) struct NetDevStats { > + /// Aggregate inbound traffic over all physical NICs in bytes. > + pub netin: u64, > + /// Aggregate outbound traffic over all physical NICs in bytes. > + pub netout: u64, > +} > + > +fn collect_host_metrics() -> PdmHostMetrics { > + let proc =3D procfs::read_proc_stat() > + .inspect_err(|err| error!("failed to read '/proc/stat': {err:#}"= )) > + .ok(); > + > + let meminfo =3D procfs::read_meminfo() > + .inspect_err(|err| error!("failed to read '/proc/meminfo': {err:= #}")) > + .ok(); > + > + let cpu_pressure =3D PressureData::read_system(Resource::Cpu) > + .inspect_err(|err| error!("failed to read CPU pressure stall inf= ormation: {err:#}")) > + .ok(); > + > + let memory_pressure =3D PressureData::read_system(Resource::Memory) > + .inspect_err(|err| error!("failed to read memory pressure stall = information: {err:#}")) > + .ok(); > + > + let io_pressure =3D PressureData::read_system(Resource::Io) > + .inspect_err(|err| error!("failed to read IO pressure stall info= rmation: {err:#}")) > + .ok(); > + > + let load =3D procfs::read_loadavg() > + .inspect_err(|err| error!("failed to read '/proc/loadavg': {err:= #}")) > + .ok(); > + > + let root_blockdev_stat =3D Disks::new() > + .blockdev_stat_for_path("/") > + .inspect_err(|err| error!("failed to collect blockdev statistics= for '/': {err:#}")) > + .ok(); > + > + let root_filesystem_info =3D proxmox_sys::fs::fs_info("/") > + .inspect_err(|err| { > + error!("failed to query filesystem usage for '/': {err:#}"); > + }) > + .ok(); > + > + let netstats =3D collect_netdev_metrics() > + .inspect_err(|err| { > + error!("failed to collect network device statistics: {err:#}= "); > + }) > + .ok(); > + > + PdmHostMetrics { > + proc, > + meminfo, > + load, > + netstats, > + root_blockdev_stat, > + root_filesystem_info, > + cpu_pressure, > + memory_pressure, > + io_pressure, > + } > +} > + > +struct NetdevCacheEntry { > + interfaces: HashMap, > + timestamp: Instant, > +} > + > +const NETWORK_INTERFACE_CACHE_MAX_AGE: Duration =3D Duration::from_secs(= 300); > +static NETWORK_INTERFACE_CACHE: Mutex> =3D Mute= x::new(None); > + > +fn collect_netdev_metrics() -> Result { > + let net_devs =3D procfs::read_proc_net_dev()?; > + > + let mut cache =3D NETWORK_INTERFACE_CACHE.lock().unwrap(); > + > + let now =3D Instant::now(); > + > + let needs_refresh =3D match cache.as_ref() { > + Some(entry) =3D> now.duration_since(entry.timestamp) > NETWORK_I= NTERFACE_CACHE_MAX_AGE, > + None =3D> true, > + }; > + > + if needs_refresh { > + cache.replace({ > + debug!("updating cached network devices"); > + > + let interfaces =3D proxmox_network_api::get_network_interfac= es() > + .context("failed to enumerate network devices")?; > + > + NetdevCacheEntry { > + interfaces, > + timestamp: now, > + } > + }); > + } > + > + // unwrap: at this point we *know* that the Option is Some > + let ip_links =3D cache.as_ref().unwrap(); > + > + let mut netin =3D 0; > + let mut netout =3D 0; > + > + for net_dev in net_devs { > + if let Some(ip_link) =3D ip_links.interfaces.get(&net_dev.device= ) { > + if ip_link.is_physical() { > + netin +=3D net_dev.receive; > + netout +=3D net_dev.send; > + } > + } > + } > + > + Ok(NetDevStats { netin, netout }) > +} > diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_coll= ection/mod.rs > index 3cd58148..8a945fab 100644 > --- a/server/src/metric_collection/mod.rs > +++ b/server/src/metric_collection/mod.rs > @@ -10,6 +10,7 @@ use tokio::sync::oneshot; > use pdm_api_types::RemoteMetricCollectionStatus; > use pdm_buildcfg::PDM_STATE_DIR_M; > > +mod local_collection_task; > mod remote_collection_task; > pub mod rrd_cache; > mod rrd_task; > @@ -19,6 +20,8 @@ pub mod top_entities; > use remote_collection_task::{ControlMsg, RemoteMetricCollectionTask}; > use rrd_cache::RrdCache; > > +use crate::metric_collection::local_collection_task::LocalMetricCollecti= onTask; > + > const RRD_CACHE_BASEDIR: &str =3D concat!(PDM_STATE_DIR_M!(), "/rrdb"); > > static CONTROL_MESSAGE_TX: OnceLock> =3D OnceLock::ne= w(); > @@ -39,14 +42,22 @@ pub fn init() -> Result<(), Error> { > pub fn start_task() -> Result<(), Error> { > let (metric_data_tx, metric_data_rx) =3D mpsc::channel(128); > > + let cache =3D rrd_cache::get_cache(); > + tokio::spawn(async move { > + let rrd_task_future =3D pin!(rrd_task::store_in_rrd_task(cache, = metric_data_rx)); > + let abort_future =3D pin!(proxmox_daemon::shutdown_future()); > + futures::future::select(rrd_task_future, abort_future).await; > + }); > + > let (trigger_collection_tx, trigger_collection_rx) =3D mpsc::channel= (128); > if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() { > bail!("control message sender already set"); > } > > + let metric_data_tx_clone =3D metric_data_tx.clone(); > tokio::spawn(async move { > let metric_collection_task_future =3D pin!(async move { > - match RemoteMetricCollectionTask::new(metric_data_tx, trigge= r_collection_rx) { > + match RemoteMetricCollectionTask::new(metric_data_tx_clone, = trigger_collection_rx) { > Ok(mut task) =3D> task.run().await, > Err(err) =3D> log::error!("could not start metric collec= tion task: {err}"), > } > @@ -56,12 +67,12 @@ pub fn start_task() -> Result<(), Error> { > futures::future::select(metric_collection_task_future, abort_fut= ure).await; > }); > > - let cache =3D rrd_cache::get_cache(); > - > tokio::spawn(async move { > - let rrd_task_future =3D pin!(rrd_task::store_in_rrd_task(cache, = metric_data_rx)); > + let metric_collection_task_future =3D > + pin!(async move { LocalMetricCollectionTask::new(metric_data= _tx).run().await }); > + > let abort_future =3D pin!(proxmox_daemon::shutdown_future()); > - futures::future::select(rrd_task_future, abort_future).await; > + futures::future::select(metric_collection_task_future, abort_fut= ure).await; > }); > > Ok(()) > diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric= _collection/rrd_task.rs > index 29137858..4cf18679 100644 > --- a/server/src/metric_collection/rrd_task.rs > +++ b/server/src/metric_collection/rrd_task.rs > @@ -8,6 +8,7 @@ use proxmox_rrd::rrd::DataSourceType; > use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; > use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDa= taType}; > > +use super::local_collection_task::PdmHostMetrics; > use super::rrd_cache::RrdCache; > > /// Store request for the RRD task. > @@ -45,6 +46,16 @@ pub(super) enum RrdStoreRequest { > /// Statistics. > stats: CollectionStats, > }, > + /// Store PDM host metrics. > + Host { > + /// Timestamp at which the metrics were collected (UNIX epoch). > + timestamp: i64, > + > + /// Metric data for this PDM host. > + // Boxed to avoid a clippy warning regarding large size differen= ces between > + // enum variants. > + metrics: Box, > + }, > } > > /// Result for a [`RrdStoreRequest`]. > @@ -117,6 +128,9 @@ pub(super) async fn store_in_rrd_task( > RrdStoreRequest::CollectionStats { timestamp, stats } = =3D> { > store_stats(&cache_clone, &stats, timestamp) > } > + RrdStoreRequest::Host { timestamp, metrics } =3D> { > + store_pdm_host_metrics(&cache_clone, timestamp, &met= rics) > + } > }; > }) > .await; > @@ -194,6 +208,177 @@ fn store_stats(cache: &RrdCache, stats: &Collection= Stats, timestamp: i64) { > ); > } > > +fn store_pdm_host_metrics(cache: &RrdCache, timestamp: i64, metrics: &Pd= mHostMetrics) { > + if let Some(proc) =3D &metrics.proc { > + cache.update_value( > + "nodes/localhost/cpu-current", > + proc.cpu, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/cpu-iowait", > + proc.iowait_percent, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > + > + if let Some(load) =3D &metrics.load { > + cache.update_value( > + "nodes/localhost/cpu-avg1", > + load.0, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/cpu-avg5", > + load.1, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/cpu-avg15", > + load.2, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > + > + if let Some(cpu_pressure) =3D &metrics.cpu_pressure { > + cache.update_value( > + "nodes/localhost/cpu-pressure-some-avg10", > + cpu_pressure.some.average_10, > + timestamp, > + DataSourceType::Gauge, > + ); > + > + // NOTE: On a system level, 'full' CPU pressure is undefined and= reported as 0, > + // so it does not make sense to store it. > + // https://docs.kernel.org/accounting/psi.html#pressure-interfac= e > + } > + > + if let Some(meminfo) =3D &metrics.meminfo { > + cache.update_value( > + "nodes/localhost/mem-total", > + meminfo.memtotal as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/mem-used", > + meminfo.memused as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/swap-total", > + meminfo.swaptotal as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/swap-used", > + meminfo.swapused as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > + > + if let Some(memory_pressure) =3D &metrics.memory_pressure { > + cache.update_value( > + "nodes/localhost/mem-pressure-some-avg10", > + memory_pressure.some.average_10, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/mem-pressure-full-avg10", > + memory_pressure.full.average_10, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > + > + if let Some(netstats) =3D &metrics.netstats { > + cache.update_value( > + "nodes/localhost/net-in", > + netstats.netin as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + cache.update_value( > + "nodes/localhost/net-out", > + netstats.netout as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + } > + > + if let Some(disk) =3D &metrics.root_filesystem_info { > + cache.update_value( > + "nodes/localhost/disk-total", > + disk.total as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/disk-used", > + disk.used as f64, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > + > + if let Some(stat) =3D &metrics.root_blockdev_stat { > + cache.update_value( > + "nodes/localhost/disk-read-iops", > + stat.read_ios as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + cache.update_value( > + "nodes/localhost/disk-write-iops", > + stat.write_ios as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + cache.update_value( > + "nodes/localhost/disk-read", > + (stat.read_sectors * 512) as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + cache.update_value( > + "nodes/localhost/disk-write", > + (stat.write_sectors * 512) as f64, > + timestamp, > + DataSourceType::Derive, > + ); > + cache.update_value( > + "nodes/localhost/disk-io-ticks", > + (stat.io_ticks as f64) / 1000.0, > + timestamp, > + DataSourceType::Derive, > + ); > + } > + > + if let Some(io_pressure) =3D &metrics.io_pressure { > + cache.update_value( > + "nodes/localhost/io-pressure-some-avg10", > + io_pressure.some.average_10, > + timestamp, > + DataSourceType::Gauge, > + ); > + cache.update_value( > + "nodes/localhost/io-pressure-full-avg10", > + io_pressure.full.average_10, > + timestamp, > + DataSourceType::Gauge, > + ); > + } > +} > + > #[cfg(test)] > mod tests { > use proxmox_rrd_api_types::{RrdMode, RrdTimeframe};