From: Lukas Wagner <>
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 07/28] metric collection: rework metric poll task
Date: Fri, 14 Feb 2025 14:06:32 +0100 [thread overview]
Message-ID: <> (raw)
In-Reply-To: <>
Metric collection is mostly limited by network latency, not disk IO or
CPU. This means we should be able get significant speedups by polling
remotes concurrently by spawning tokio workers.
To avoid load/IO spikes, we limit the number of concurrent connections
using a semaphore.
Each concurrent task which fetches a single remote waits a random amount
of time before actually connecting to the remote. The upper and lower
bounds for this random delay can be configured in the metric collection
settings. The main aim of this mechanism is to reduce load spikes.
Furthermore, each time when the main collection interval timer
fires, a random delay is introduced before actually starting the
collection process.
This is useful due to how we set up the timer; we configure
it to figure at aligned points in time. For instance, if the
collection interval is set to 60s, the timer fires at
minute boundaries. Using this random offset, we can avoid
triggering at the same time as other timers (cron, systemd).
Furthermore, we add a mechanism to trigger metric collection manually.
This is achieve by using `tokio::select!` on both, the timer's `tick`
method and the `recv` method of an `mpsc` which is used to send control
messages to the metric collection task.
For better code structure, the collection task is split into a separate
Signed-off-by: Lukas Wagner <>
Changes since v1:
- get_settings_or_default function: use unwrap_or_default instead
of match
- Add `rand` crate dependency to d/control
- Refactor sleep helpers, consolidate into a single
parameterised funtion
- Get rid of .unwrap when spawning up the metric collection task
- Let trigger_metric_collection_for_remote take a String instead of
Cargo.toml | 1 +
debian/control | 1 +
server/Cargo.toml | 1 +
server/src/bin/ | 2 +-
.../src/metric_collection/ | 300 ++++++++++++++++++
server/src/metric_collection/ | 123 +++----
6 files changed, 347 insertions(+), 81 deletions(-)
create mode 100644 server/src/metric_collection/
diff --git a/Cargo.toml b/Cargo.toml
index 4f3b1d03..7dd60a53 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -108,6 +108,7 @@ once_cell = "1.3.1"
openssl = "0.10.40"
percent-encoding = "2.1"
pin-project-lite = "0.2"
+rand = "0.8"
regex = "1.5.5"
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.1"
diff --git a/debian/control b/debian/control
index 399ad86d..1f7ad17c 100644
--- a/debian/control
+++ b/debian/control
@@ -88,6 +88,7 @@ Build-Depends: cargo:native,
+ librust-rand-0.8+default-dev,
librust-regex-1+default-dev (>= 1.5.5-~~),
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 7b0058e1..c8308f2c 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -24,6 +24,7 @@ nix.workspace = true
once_cell.workspace = true
openssl.workspace = true
percent-encoding.workspace = true
+rand.workspace = true
serde.workspace = true
serde_json.workspace = true
syslog.workspace = true
diff --git a/server/src/bin/ b/server/src/bin/
index a79094d5..6e85e523 100644
--- a/server/src/bin/
+++ b/server/src/bin/
@@ -286,7 +286,7 @@ async fn run(debug: bool) -> Result<(), Error> {
- metric_collection::start_task();
+ metric_collection::start_task()?;
diff --git a/server/src/metric_collection/ b/server/src/metric_collection/
new file mode 100644
index 00000000..b55c8e92
--- /dev/null
+++ b/server/src/metric_collection/
@@ -0,0 +1,300 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+use anyhow::Error;
+use rand::Rng;
+use tokio::{
+ sync::{
+ mpsc::{Receiver, Sender},
+ OwnedSemaphorePermit, Semaphore,
+ },
+ time::Interval,
+use proxmox_section_config::typed::SectionConfigData;
+use pdm_api_types::{
+ remotes::{Remote, RemoteType},
+ CollectionSettings,
+use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
+use crate::{connection, task_utils};
+use super::rrd_task::RrdStoreRequest;
+/// Control messages for the metric collection task.
+pub(super) enum ControlMsg {
+ CollectSingleRemote(String),
+ CollectAllRemotes,
+/// Task which periodically collects metrics from all remotes and stores
+/// them in the local metrics database.
+pub(super) struct MetricCollectionTask {
+ most_recent_timestamps: HashMap<String, i64>,
+ settings: CollectionSettings,
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+impl MetricCollectionTask {
+ /// Create a new metric collection task.
+ pub(super) fn new(
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+ ) -> Result<Self, Error> {
+ let settings = Self::get_settings_or_default();
+ Ok(Self {
+ most_recent_timestamps: HashMap::new(),
+ settings,
+ metric_data_tx,
+ control_message_rx,
+ })
+ }
+ /// Run the metric collection task.
+ ///
+ /// This function never returns.
+ #[tracing::instrument(skip_all, name = "metric_collection_task")]
+ pub(super) async fn run(&mut self) {
+ let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
+ log::debug!(
+ "metric collection starting up. Collection interval set to {} seconds.",
+ self.settings.collection_interval_or_default()
+ );
+ loop {
+ let old_settings = self.settings.clone();
+ tokio::select! {
+ _ = timer.tick() => {
+ // Reload settings in case they have changed in the meanwhile
+ self.settings = Self::get_settings_or_default();
+ log::debug!("starting metric collection from all remotes - triggered by timer");
+ Self::sleep_for_random_millis(
+ self.settings.min_interval_offset_or_default() * 1000,
+ self.settings.max_interval_offset_or_default() * 1000,
+ "interval-offset",
+ ).await;
+ if let Some(remotes) = Self::load_remote_config() {
+ let to_fetch = remotes.order.as_slice();
+ self.fetch_remotes(&remotes, to_fetch).await;
+ }
+ }
+ val = self.control_message_rx.recv() => {
+ // Reload settings in case they have changed in the meanwhile
+ self.settings = Self::get_settings_or_default();
+ match val {
+ Some(ControlMsg::CollectSingleRemote(remote)) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
+ self.fetch_remotes(&remotes, &[remote]).await;
+ }
+ }
+ Some(ControlMsg::CollectAllRemotes) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection from all remotes - triggered by control message");
+ self.fetch_remotes(&remotes, &remotes.order).await;
+ }
+ }
+ _ => {},
+ }
+ }
+ }
+ let interval = self.settings.collection_interval_or_default();
+ if old_settings.collection_interval_or_default() != interval {
+ log::info!(
+ "metric collection interval changed to {} seconds, reloading timer",
+ interval
+ );
+ timer = Self::setup_timer(interval);
+ }
+ }
+ }
+ /// Sleep between `min` and `max` milliseconds.
+ ///
+ /// If `min` is larger than `max`, `min` will be set to `max` and a log message
+ /// will be printed.
+ async fn sleep_for_random_millis(mut min: u64, max: u64, param_base: &str) {
+ if min > max {
+ log::warn!(
+ "min-{param_base} is larger than max-{param_base} ({min} > {max}) - \
+ capping it to max-{param_base} ({max})"
+ );
+ min = max;
+ }
+ let jitter = {
+ let mut rng = rand::thread_rng();
+ rng.gen_range(min..=max)
+ };
+ tokio::time::sleep(Duration::from_millis(jitter)).await;
+ }
+ fn get_settings_or_default() -> CollectionSettings {
+ // We want to fall back to defaults if
+ // - the config file does not exist (no error should be logged)
+ // - the section type is wrong or if the config failed to parse (log an error in this
+ // case)
+ fn get_settings_impl() -> Result<CollectionSettings, Error> {
+ let (config, _) = pdm_config::metric_collection::config()?;
+ let all_sections: Vec<CollectionSettings> =
+ config.convert_to_typed_array(COLLECTION_SETTINGS_TYPE)?;
+ for section in all_sections {
+ if == "default" {
+ return Ok(section);
+ }
+ }
+ Ok(CollectionSettings::new("default"))
+ }
+ get_settings_impl().unwrap_or_else(|err| {
+ log::error!(
+ "could not read metric collection settings: {err} - falling back to default config"
+ );
+ CollectionSettings::new("default")
+ })
+ }
+ /// Set up a [`tokio::time::Interval`] instance with the provided interval.
+ /// The timer will be aligned, e.g. an interval of `60` will let the timer
+ /// fire at minute boundaries.
+ fn setup_timer(interval: u64) -> Interval {
+ let mut timer = tokio::time::interval(Duration::from_secs(interval));
+ let first_run = task_utils::next_aligned_instant(interval).into();
+ timer.reset_at(first_run);
+ timer
+ }
+ /// Convenience helper to load `remote.cfg`, logging the error
+ /// and returning `None` if the config could not be read.
+ fn load_remote_config() -> Option<SectionConfigData<Remote>> {
+ match pdm_config::remotes::config() {
+ Ok((remotes, _)) => Some(remotes),
+ Err(e) => {
+ log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
+ None
+ }
+ }
+ }
+ /// Fetch metric data from a provided list of remotes concurrently.
+ /// The maximum number of concurrent connections is determined by
+ /// `max_concurrent_connections` in the [`CollectionSettings`]
+ /// instance in `self`.
+ async fn fetch_remotes(
+ &mut self,
+ remote_config: &SectionConfigData<Remote>,
+ remotes_to_fetch: &[String],
+ ) {
+ let semaphore = Arc::new(Semaphore::new(
+ self.settings.max_concurrent_connections_or_default(),
+ ));
+ let mut handles = Vec::new();
+ for remote_name in remotes_to_fetch {
+ let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+ // unwrap is okay here, acquire_* will only fail if `close` has been
+ // called on the semaphore.
+ let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
+ if let Some(remote) = remote_config.get(remote_name).cloned() {
+ log::debug!("fetching remote '{}'",;
+ let handle = tokio::spawn(Self::fetch_single_remote(
+ self.settings.clone(),
+ remote,
+ start_time,
+ self.metric_data_tx.clone(),
+ permit,
+ ));
+ handles.push((remote_name.clone(), handle));
+ }
+ }
+ for (remote_name, handle) in handles {
+ let res = handle.await;
+ match res {
+ Ok(Ok(ts)) => {
+ self.most_recent_timestamps
+ .insert(remote_name.to_string(), ts);
+ }
+ Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
+ Err(err) => {
+ log::error!(
+ "join error for metric collection task for remote {remote_name}: {err}"
+ )
+ }
+ }
+ }
+ }
+ /// Fetch a single remote.
+ #[tracing::instrument(skip_all, fields(remote =, name = "metric_collection_task")]
+ async fn fetch_single_remote(
+ settings: CollectionSettings,
+ remote: Remote,
+ start_time: i64,
+ sender: Sender<RrdStoreRequest>,
+ _permit: OwnedSemaphorePermit,
+ ) -> Result<i64, Error> {
+ Self::sleep_for_random_millis(
+ settings.min_connection_delay_or_default(),
+ settings.max_connection_delay_or_default(),
+ "connection-delay",
+ )
+ .await;
+ let most_recent_timestamp = match remote.ty {
+ RemoteType::Pve => {
+ let client = connection::make_pve_client(&remote)?;
+ let metrics = client
+ .cluster_metrics_export(Some(true), Some(false), Some(start_time))
+ .await?;
+ let most_recent =, |acc, x| acc.max(x.timestamp));
+ sender
+ .send(RrdStoreRequest::Pve {
+ remote:,
+ metrics,
+ })
+ .await?;
+ most_recent
+ }
+ RemoteType::Pbs => {
+ let client = connection::make_pbs_client(&remote)?;
+ let metrics = client.metrics(Some(true), Some(start_time)).await?;
+ let most_recent =, |acc, x| acc.max(x.timestamp));
+ sender
+ .send(RrdStoreRequest::Pbs {
+ remote:,
+ metrics,
+ })
+ .await?;
+ most_recent
+ }
+ };
+ Ok(most_recent_timestamp)
+ }
diff --git a/server/src/metric_collection/ b/server/src/metric_collection/
index 06ade5f0..9b203615 100644
--- a/server/src/metric_collection/
+++ b/server/src/metric_collection/
@@ -1,20 +1,17 @@
-use std::collections::HashMap;
use std::pin::pin;
+use std::sync::OnceLock;
-use anyhow::Error;
+use anyhow::{bail, Error};
use tokio::sync::mpsc::{self, Sender};
-use pdm_api_types::remotes::RemoteType;
-use crate::{connection, task_utils};
+mod collection_task;
pub mod rrd_cache;
mod rrd_task;
pub mod top_entities;
-use rrd_task::RrdStoreRequest;
+use collection_task::{ControlMsg, MetricCollectionTask};
-const COLLECTION_INTERVAL: u64 = 60;
+static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
/// Initialize the RRD cache
pub fn init() -> Result<(), Error> {
@@ -24,89 +21,55 @@ pub fn init() -> Result<(), Error> {
/// Start the metric collection task.
-pub fn start_task() {
- let (tx, rx) = mpsc::channel(128);
+pub fn start_task() -> Result<(), Error> {
+ let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
+ let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
+ if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
+ bail!("control message sender alread set");
+ }
tokio::spawn(async move {
- let task_scheduler = pin!(metric_collection_task(tx));
+ let metric_collection_task_future = pin!(async move {
+ match MetricCollectionTask::new(metric_data_tx, trigger_collection_rx) {
+ Ok(mut task) =>,
+ Err(err) => log::error!("could not start metric collection task: {err}"),
+ }
+ });
let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
+ futures::future::select(metric_collection_task_future, abort_future).await;
tokio::spawn(async move {
- let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
+ let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
+ futures::future::select(rrd_task_future, abort_future).await;
-async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
- let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
+ Ok(())
- loop {
- let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
+/// Schedule metric collection for a given remote as soon as possible.
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection_for_remote(remote: String) -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender.send(ControlMsg::CollectSingleRemote(remote)).await?;
+ }
- let remotes = match pdm_config::remotes::config() {
- Ok((remotes, _)) => remotes,
- Err(e) => {
- log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
- continue;
- }
- };
- for (remote_name, remote) in &remotes.sections {
- let start_time = *most_recent_timestamps.get(remote_name).unwrap_or(&0);
- let remote_name_clone = remote_name.clone();
- let res = async {
- let most_recent_timestamp = match remote.ty {
- RemoteType::Pve => {
- let client = connection::make_pve_client(remote)?;
- let metrics = client
- .cluster_metrics_export(Some(true), Some(false), Some(start_time))
- .await?;
- let most_recent =
-, |acc, x| acc.max(x.timestamp));
- sender
- .send(RrdStoreRequest::Pve {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
- Ok::<i64, Error>(most_recent)
- }
- RemoteType::Pbs => {
- let client = connection::make_pbs_client(remote)?;
- let metrics = client.metrics(Some(true), Some(start_time)).await?;
- let most_recent =
-, |acc, x| acc.max(x.timestamp));
- sender
- .send(RrdStoreRequest::Pbs {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
- Ok::<i64, Error>(most_recent)
- }
- }?;
- Ok::<i64, Error>(most_recent_timestamp)
- }
- .await;
+ Ok(())
- match res {
- Ok(ts) => {
- most_recent_timestamps.insert(remote_name.to_string(), ts);
- }
- Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
- }
- }
+/// Schedule metric collection for all remotes as soon as possible.
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection() -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender.send(ControlMsg::CollectAllRemotes).await?;
+ Ok(())
pdm-devel mailing list
next prev parent reply other threads:[~2025-02-14 13:17 UTC|newest]
Thread overview: 35+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-02-14 13:06 [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 01/28] test support: add NamedTempFile helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 02/28] test support: add NamedTempDir helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 03/28] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-02-18 15:26 ` Wolfgang Bumiller
2025-02-18 15:31 ` Stefan Hanreich
2025-02-21 8:27 ` Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 04/28] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 05/28] metric collection: split top_entities split into separate module Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 06/28] metric collection: save metric data to RRD in separate task Lukas Wagner
2025-02-14 13:06 ` Lukas Wagner [this message]
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 08/28] metric collection: persist state after metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 09/28] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 10/28] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 11/28] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 12/28] metric collection: add test for fetch_overdue Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 14/28] metric collection: add test for rrd task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 16/28] metric collection: record remote response time in metric database Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 17/28] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 18/28] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 19/28] api: add endpoint for updating metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 20/28] api: add endpoint to trigger metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 21/28] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 22/28] api: add api for querying metric collection RRD data Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 23/28] api: metric-collection: add status endpoint Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 24/28] pdm-client: add metric collection API methods Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 25/28] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 26/28] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 27/28] metric collection: skip missed timer ticks Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-02-21 13:19 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Maximiliano Sandoval
2025-03-14 14:10 ` Lukas Wagner
2025-03-16 21:45 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \ \ \ \
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal