From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 3E7781FF13F for ; Thu, 12 Mar 2026 14:53:18 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8DECC17521; Thu, 12 Mar 2026 14:53:13 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Date: Thu, 12 Mar 2026 14:52:22 +0100 Message-ID: <20260312135229.420729-22-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com> References: <20260312135229.420729-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773323524662 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: PJV33J7GXN2CBWRVW5J5E6KNZ5MS2TB6 X-Message-ID-Hash: PJV33J7GXN2CBWRVW5J5E6KNZ5MS2TB6 X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Then whole architecture is pretty similar to the remote metric collection. We introduce a task that fetches host metrics and sends them via a channel to the RRD task, which is responsible for persisting them in the RRD database. Signed-off-by: Lukas Wagner --- Cargo.toml | 2 + debian/control | 1 + server/Cargo.toml | 2 + .../local_collection_task.rs | 199 ++++++++++++++++++ server/src/metric_collection/mod.rs | 21 +- server/src/metric_collection/rrd_task.rs | 185 ++++++++++++++++ 6 files changed, 405 insertions(+), 5 deletions(-) create mode 100644 server/src/metric_collection/local_collection_task.rs diff --git a/Cargo.toml b/Cargo.toml index 1adb8a0a..91741ea1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ proxmox-auth-api = "1.0.5" proxmox-base64 = "1" proxmox-client = "1" proxmox-daemon = "1" +proxmox-disks = "0.1" proxmox-docgen = "1" proxmox-http = { version = "1.0.4", features = [ "client", "http-helpers", "websocket" ] } # see below proxmox-human-byte = "1" @@ -47,6 +48,7 @@ proxmox-ldap = { version = "1.1", features = ["sync"] } proxmox-lang = "1.1" proxmox-log = "1" proxmox-login = "1.0.2" +proxmox-procfs = "0.1" proxmox-rest-server = "1" # some use "cli", some use "cli" and "server", pbs-config uses nothing proxmox-router = { version = "3.0.0", default-features = false } diff --git a/debian/control b/debian/control index 4ddc9efc..c61e8795 100644 --- a/debian/control +++ b/debian/control @@ -52,6 +52,7 @@ Build-Depends: debhelper-compat (= 13), librust-proxmox-config-digest-1+default-dev, librust-proxmox-config-digest-1+openssl-dev, librust-proxmox-daemon-1+default-dev, + librust-proxmox-disks-0.1+default-dev, librust-proxmox-dns-api-1+default-dev, librust-proxmox-dns-api-1+impl-dev, librust-proxmox-docgen-1+default-dev, diff --git a/server/Cargo.toml b/server/Cargo.toml index 6969549f..65170864 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -40,6 +40,7 @@ proxmox-async.workspace = true proxmox-auth-api = { workspace = true, features = [ "api", "ticket", "pam-authenticator", "password-authenticator" ] } proxmox-base64.workspace = true proxmox-daemon.workspace = true +proxmox-disks.workspace = true proxmox-docgen.workspace = true proxmox-http = { workspace = true, features = [ "client-trait", "proxmox-async" ] } # pbs-client doesn't use these proxmox-lang.workspace = true @@ -47,6 +48,7 @@ proxmox-ldap.workspace = true proxmox-log.workspace = true proxmox-login.workspace = true proxmox-openid.workspace = true +proxmox-procfs.workspace = true proxmox-rest-server = { workspace = true, features = [ "templates" ] } proxmox-router = { workspace = true, features = [ "cli", "server"] } proxmox-rrd.workspace = true diff --git a/server/src/metric_collection/local_collection_task.rs b/server/src/metric_collection/local_collection_task.rs new file mode 100644 index 00000000..a70b3d96 --- /dev/null +++ b/server/src/metric_collection/local_collection_task.rs @@ -0,0 +1,199 @@ +use std::sync::Mutex; +use std::time::Instant; +use std::{collections::HashMap, time::Duration}; + +use anyhow::{Context, Error}; +use tokio::{sync::mpsc::Sender, time::MissedTickBehavior}; + +use proxmox_disks::DiskManage; +use proxmox_log::{debug, error}; +use proxmox_network_api::IpLink; +use proxmox_procfs::pressure::{PressureData, Resource}; +use proxmox_sys::fs; +use proxmox_sys::linux::procfs; + +use super::rrd_task::RrdStoreRequest; + +const HOST_METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(10); + +/// Task which periodically collects metrics from the PDM host and stores +/// them in the local metrics database. +pub(super) struct LocalMetricCollectionTask { + metric_data_tx: Sender, +} + +impl LocalMetricCollectionTask { + /// Create a new metric collection task. + pub(super) fn new(metric_data_tx: Sender) -> Self { + Self { metric_data_tx } + } + + /// Run the metric collection task. + /// + /// This function never returns. + pub(super) async fn run(&mut self) { + let mut timer = tokio::time::interval(HOST_METRIC_COLLECTION_INTERVAL); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + timer.tick().await; + self.handle_tick().await; + } + } + + /// Handle a timer tick. + async fn handle_tick(&mut self) { + let stats = match tokio::task::spawn_blocking(collect_host_metrics).await { + Ok(stats) => stats, + Err(err) => { + error!("join error while collecting host stats: {err}"); + return; + } + }; + + let _ = self + .metric_data_tx + .send(RrdStoreRequest::Host { + timestamp: proxmox_time::epoch_i64(), + metrics: Box::new(stats), + }) + .await; + } +} + +/// Container type for various metrics of a PDM host. +pub(super) struct PdmHostMetrics { + /// CPU statistics from `/proc/stat`. + pub proc: Option, + /// Memory statistics from `/proc/meminfo`. + pub meminfo: Option, + /// System load stats from `/proc/loadavg`. + pub load: Option, + /// Aggregated network device traffic for all physical NICs. + pub netstats: Option, + /// Block device stats for the root disk. + pub root_blockdev_stat: Option, + /// File system usage for the root disk. + pub root_filesystem_info: Option, + /// CPU pressure stall information for the host. + pub cpu_pressure: Option, + /// CPU pressure stall information for the host. + pub memory_pressure: Option, + /// IO pressure stall information for the host. + pub io_pressure: Option, +} + +/// Aggregated network device traffic for all physical NICs. +pub(super) struct NetDevStats { + /// Aggregate inbound traffic over all physical NICs in bytes. + pub netin: u64, + /// Aggregate outbound traffic over all physical NICs in bytes. + pub netout: u64, +} + +fn collect_host_metrics() -> PdmHostMetrics { + let proc = procfs::read_proc_stat() + .inspect_err(|err| error!("failed to read '/proc/stat': {err:#}")) + .ok(); + + let meminfo = procfs::read_meminfo() + .inspect_err(|err| error!("failed to read '/proc/meminfo': {err:#}")) + .ok(); + + let cpu_pressure = PressureData::read_system(Resource::Cpu) + .inspect_err(|err| error!("failed to read CPU pressure stall information: {err:#}")) + .ok(); + + let memory_pressure = PressureData::read_system(Resource::Memory) + .inspect_err(|err| error!("failed to read memory pressure stall information: {err:#}")) + .ok(); + + let io_pressure = PressureData::read_system(Resource::Io) + .inspect_err(|err| error!("failed to read IO pressure stall information: {err:#}")) + .ok(); + + let load = procfs::read_loadavg() + .inspect_err(|err| error!("failed to read '/proc/loadavg': {err:#}")) + .ok(); + + let root_blockdev_stat = DiskManage::new() + .blockdev_stat_for_path("/") + .inspect_err(|err| error!("failed to collect blockdev statistics for '/': {err:#}")) + .ok(); + + let root_filesystem_info = proxmox_sys::fs::fs_info("/") + .inspect_err(|err| { + error!("failed to query filesystem usage for '/': {err:#}"); + }) + .ok(); + + let netstats = collect_netdev_metrics() + .inspect_err(|err| { + error!("failed to collect network device statistics: {err:#}"); + }) + .ok(); + + PdmHostMetrics { + proc, + meminfo, + load, + netstats, + root_blockdev_stat, + root_filesystem_info, + cpu_pressure, + memory_pressure, + io_pressure, + } +} + +struct NetdevCacheEntry { + interfaces: HashMap, + timestamp: Instant, +} + +const NETWORK_INTERFACE_CACHE_MAX_AGE: Duration = Duration::from_secs(300); +static NETWORK_INTERFACE_CACHE: Mutex> = Mutex::new(None); + +fn collect_netdev_metrics() -> Result { + let net_devs = procfs::read_proc_net_dev()?; + + let mut cache = NETWORK_INTERFACE_CACHE.lock().unwrap(); + + let now = Instant::now(); + + let needs_refresh = match cache.as_ref() { + Some(entry) => now.duration_since(entry.timestamp) > NETWORK_INTERFACE_CACHE_MAX_AGE, + None => true, + }; + + if needs_refresh { + cache.replace({ + debug!("updating cached network devices"); + + let interfaces = proxmox_network_api::get_network_interfaces() + .context("failed to enumerate network devices")?; + + NetdevCacheEntry { + interfaces, + timestamp: now, + } + }); + } + + // unwrap: at this point we *know* that the Option is Some + let ip_links = cache.as_ref().unwrap(); + + let mut netin = 0; + let mut netout = 0; + + for net_dev in net_devs { + if let Some(ip_link) = ip_links.interfaces.get(&net_dev.device) { + if ip_link.is_physical() { + netin += net_dev.receive; + netout += net_dev.send; + } + } + } + + Ok(NetDevStats { netin, netout }) +} diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs index 3cd58148..8a945fab 100644 --- a/server/src/metric_collection/mod.rs +++ b/server/src/metric_collection/mod.rs @@ -10,6 +10,7 @@ use tokio::sync::oneshot; use pdm_api_types::RemoteMetricCollectionStatus; use pdm_buildcfg::PDM_STATE_DIR_M; +mod local_collection_task; mod remote_collection_task; pub mod rrd_cache; mod rrd_task; @@ -19,6 +20,8 @@ pub mod top_entities; use remote_collection_task::{ControlMsg, RemoteMetricCollectionTask}; use rrd_cache::RrdCache; +use crate::metric_collection::local_collection_task::LocalMetricCollectionTask; + const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb"); static CONTROL_MESSAGE_TX: OnceLock> = OnceLock::new(); @@ -39,14 +42,22 @@ pub fn init() -> Result<(), Error> { pub fn start_task() -> Result<(), Error> { let (metric_data_tx, metric_data_rx) = mpsc::channel(128); + let cache = rrd_cache::get_cache(); + tokio::spawn(async move { + let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx)); + let abort_future = pin!(proxmox_daemon::shutdown_future()); + futures::future::select(rrd_task_future, abort_future).await; + }); + let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128); if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() { bail!("control message sender already set"); } + let metric_data_tx_clone = metric_data_tx.clone(); tokio::spawn(async move { let metric_collection_task_future = pin!(async move { - match RemoteMetricCollectionTask::new(metric_data_tx, trigger_collection_rx) { + match RemoteMetricCollectionTask::new(metric_data_tx_clone, trigger_collection_rx) { Ok(mut task) => task.run().await, Err(err) => log::error!("could not start metric collection task: {err}"), } @@ -56,12 +67,12 @@ pub fn start_task() -> Result<(), Error> { futures::future::select(metric_collection_task_future, abort_future).await; }); - let cache = rrd_cache::get_cache(); - tokio::spawn(async move { - let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx)); + let metric_collection_task_future = + pin!(async move { LocalMetricCollectionTask::new(metric_data_tx).run().await }); + let abort_future = pin!(proxmox_daemon::shutdown_future()); - futures::future::select(rrd_task_future, abort_future).await; + futures::future::select(metric_collection_task_future, abort_future).await; }); Ok(()) diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs index 29137858..4cf18679 100644 --- a/server/src/metric_collection/rrd_task.rs +++ b/server/src/metric_collection/rrd_task.rs @@ -8,6 +8,7 @@ use proxmox_rrd::rrd::DataSourceType; use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; +use super::local_collection_task::PdmHostMetrics; use super::rrd_cache::RrdCache; /// Store request for the RRD task. @@ -45,6 +46,16 @@ pub(super) enum RrdStoreRequest { /// Statistics. stats: CollectionStats, }, + /// Store PDM host metrics. + Host { + /// Timestamp at which the metrics were collected (UNIX epoch). + timestamp: i64, + + /// Metric data for this PDM host. + // Boxed to avoid a clippy warning regarding large size differences between + // enum variants. + metrics: Box, + }, } /// Result for a [`RrdStoreRequest`]. @@ -117,6 +128,9 @@ pub(super) async fn store_in_rrd_task( RrdStoreRequest::CollectionStats { timestamp, stats } => { store_stats(&cache_clone, &stats, timestamp) } + RrdStoreRequest::Host { timestamp, metrics } => { + store_pdm_host_metrics(&cache_clone, timestamp, &metrics) + } }; }) .await; @@ -194,6 +208,177 @@ fn store_stats(cache: &RrdCache, stats: &CollectionStats, timestamp: i64) { ); } +fn store_pdm_host_metrics(cache: &RrdCache, timestamp: i64, metrics: &PdmHostMetrics) { + if let Some(proc) = &metrics.proc { + cache.update_value( + "nodes/localhost/cpu-current", + proc.cpu, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/cpu-iowait", + proc.iowait_percent, + timestamp, + DataSourceType::Gauge, + ); + } + + if let Some(load) = &metrics.load { + cache.update_value( + "nodes/localhost/cpu-avg1", + load.0, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/cpu-avg5", + load.1, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/cpu-avg15", + load.2, + timestamp, + DataSourceType::Gauge, + ); + } + + if let Some(cpu_pressure) = &metrics.cpu_pressure { + cache.update_value( + "nodes/localhost/cpu-pressure-some-avg10", + cpu_pressure.some.average_10, + timestamp, + DataSourceType::Gauge, + ); + + // NOTE: On a system level, 'full' CPU pressure is undefined and reported as 0, + // so it does not make sense to store it. + // https://docs.kernel.org/accounting/psi.html#pressure-interface + } + + if let Some(meminfo) = &metrics.meminfo { + cache.update_value( + "nodes/localhost/mem-total", + meminfo.memtotal as f64, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/mem-used", + meminfo.memused as f64, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/swap-total", + meminfo.swaptotal as f64, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/swap-used", + meminfo.swapused as f64, + timestamp, + DataSourceType::Gauge, + ); + } + + if let Some(memory_pressure) = &metrics.memory_pressure { + cache.update_value( + "nodes/localhost/mem-pressure-some-avg10", + memory_pressure.some.average_10, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/mem-pressure-full-avg10", + memory_pressure.full.average_10, + timestamp, + DataSourceType::Gauge, + ); + } + + if let Some(netstats) = &metrics.netstats { + cache.update_value( + "nodes/localhost/net-in", + netstats.netin as f64, + timestamp, + DataSourceType::Derive, + ); + cache.update_value( + "nodes/localhost/net-out", + netstats.netout as f64, + timestamp, + DataSourceType::Derive, + ); + } + + if let Some(disk) = &metrics.root_filesystem_info { + cache.update_value( + "nodes/localhost/disk-total", + disk.total as f64, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/disk-used", + disk.used as f64, + timestamp, + DataSourceType::Gauge, + ); + } + + if let Some(stat) = &metrics.root_blockdev_stat { + cache.update_value( + "nodes/localhost/disk-read-iops", + stat.read_ios as f64, + timestamp, + DataSourceType::Derive, + ); + cache.update_value( + "nodes/localhost/disk-write-iops", + stat.write_ios as f64, + timestamp, + DataSourceType::Derive, + ); + cache.update_value( + "nodes/localhost/disk-read", + (stat.read_sectors * 512) as f64, + timestamp, + DataSourceType::Derive, + ); + cache.update_value( + "nodes/localhost/disk-write", + (stat.write_sectors * 512) as f64, + timestamp, + DataSourceType::Derive, + ); + cache.update_value( + "nodes/localhost/disk-io-ticks", + (stat.io_ticks as f64) / 1000.0, + timestamp, + DataSourceType::Derive, + ); + } + + if let Some(io_pressure) = &metrics.io_pressure { + cache.update_value( + "nodes/localhost/io-pressure-some-avg10", + io_pressure.some.average_10, + timestamp, + DataSourceType::Gauge, + ); + cache.update_value( + "nodes/localhost/io-pressure-full-avg10", + io_pressure.full.average_10, + timestamp, + DataSourceType::Gauge, + ); + } +} + #[cfg(test)] mod tests { use proxmox_rrd_api_types::{RrdMode, RrdTimeframe}; -- 2.47.3