From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v5 03/23] metric collection: rework metric poll task
Date: Thu, 14 Aug 2025 15:31:22 +0200 [thread overview]
Message-ID: <20250814133142.386650-4-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250814133142.386650-1-l.wagner@proxmox.com>
Metric collection is mostly limited by network latency, not disk IO or
CPU. This means we should be able get significant speedups by polling
remotes concurrently by spawning tokio workers.
To avoid load/IO spikes, we limit the number of concurrent connections
using a semaphore.
Furthermore, we add a mechanism to trigger metric collection manually.
This is achieve by using `tokio::select!` on both, the timer's `tick`
method and the `recv` method of an `mpsc` which is used to send control
messages to the metric collection task.
For better code structure, the collection task is split into a separate
module.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
Notes:
Changes since v1:
- get_settings_or_default function: use unwrap_or_default instead
of match
- Add `rand` crate dependency to d/control
- Refactor sleep helpers, consolidate into a single
parameterised funtion
- Get rid of .unwrap when spawning up the metric collection task
- Let trigger_metric_collection_for_remote take a String instead of
&str
Changes since v2:
- Use hardcoded default for max-concurrency for now. This
might be replaced by some global knob, affecting all background tasks
- Drop interval delays/jitter for now, this also might be affected
by a global request scheduler
server/src/bin/proxmox-datacenter-api/main.rs | 2 +-
.../src/metric_collection/collection_task.rs | 216 ++++++++++++++++++
server/src/metric_collection/mod.rs | 145 +++++-------
3 files changed, 271 insertions(+), 92 deletions(-)
create mode 100644 server/src/metric_collection/collection_task.rs
diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index db6b2585..c013646e 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -373,7 +373,7 @@ async fn run(debug: bool) -> Result<(), Error> {
});
start_task_scheduler();
- metric_collection::start_task();
+ metric_collection::start_task()?;
tasks::remote_node_mapping::start_task();
resource_cache::start_task();
diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
new file mode 100644
index 00000000..a7420edb
--- /dev/null
+++ b/server/src/metric_collection/collection_task.rs
@@ -0,0 +1,216 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use anyhow::Error;
+use tokio::{
+ sync::{
+ mpsc::{Receiver, Sender},
+ OwnedSemaphorePermit, Semaphore,
+ },
+ time::Interval,
+};
+
+use proxmox_section_config::typed::SectionConfigData;
+
+use pdm_api_types::remotes::{Remote, RemoteType};
+
+use crate::{connection, task_utils};
+
+use super::rrd_task::RrdStoreRequest;
+
+pub const MAX_CONCURRENT_CONNECTIONS: usize = 20;
+
+/// Default metric collection interval.
+pub const DEFAULT_COLLECTION_INTERVAL: u64 = 600;
+
+/// Control messages for the metric collection task.
+pub(super) enum ControlMsg {
+ CollectSingleRemote(String),
+ CollectAllRemotes,
+}
+
+/// Task which periodically collects metrics from all remotes and stores
+/// them in the local metrics database.
+pub(super) struct MetricCollectionTask {
+ most_recent_timestamps: HashMap<String, i64>,
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+}
+
+impl MetricCollectionTask {
+ /// Create a new metric collection task.
+ pub(super) fn new(
+ metric_data_tx: Sender<RrdStoreRequest>,
+ control_message_rx: Receiver<ControlMsg>,
+ ) -> Result<Self, Error> {
+ Ok(Self {
+ most_recent_timestamps: HashMap::new(),
+ metric_data_tx,
+ control_message_rx,
+ })
+ }
+
+ /// Run the metric collection task.
+ ///
+ /// This function never returns.
+ #[tracing::instrument(skip_all, name = "metric_collection_task")]
+ pub(super) async fn run(&mut self) {
+ let mut timer = Self::setup_timer(DEFAULT_COLLECTION_INTERVAL);
+
+ log::debug!(
+ "metric collection starting up. Collection interval set to {} seconds.",
+ DEFAULT_COLLECTION_INTERVAL,
+ );
+
+ loop {
+ tokio::select! {
+ _ = timer.tick() => {
+ log::debug!("starting metric collection from all remotes - triggered by timer");
+
+ if let Some(remotes) = Self::load_remote_config() {
+ let to_fetch = remotes.iter().map(|(name, _)| name.into()).collect::<Vec<String>>();
+ self.fetch_remotes(&remotes, &to_fetch).await;
+ }
+ }
+
+ val = self.control_message_rx.recv() => {
+ // Reload settings in case they have changed in the meanwhile
+ match val {
+ Some(ControlMsg::CollectSingleRemote(remote)) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
+ self.fetch_remotes(&remotes, &[remote]).await;
+ }
+ }
+ Some(ControlMsg::CollectAllRemotes) => {
+ if let Some(remotes) = Self::load_remote_config() {
+ log::debug!("starting metric collection from all remotes - triggered by control message");
+ let to_fetch = remotes.iter().map(|(name, _)| name.into()).collect::<Vec<String>>();
+ self.fetch_remotes(&remotes, &to_fetch).await;
+ }
+ }
+ _ => {},
+ }
+ }
+ }
+ }
+ }
+
+ /// Set up a [`tokio::time::Interval`] instance with the provided interval.
+ /// The timer will be aligned, e.g. an interval of `60` will let the timer
+ /// fire at minute boundaries.
+ fn setup_timer(interval: u64) -> Interval {
+ let mut timer = tokio::time::interval(Duration::from_secs(interval));
+ let first_run = task_utils::next_aligned_instant(interval).into();
+ timer.reset_at(first_run);
+
+ timer
+ }
+
+ /// Convenience helper to load `remote.cfg`, logging the error
+ /// and returning `None` if the config could not be read.
+ fn load_remote_config() -> Option<SectionConfigData<Remote>> {
+ match pdm_config::remotes::config() {
+ Ok((remotes, _)) => Some(remotes),
+ Err(e) => {
+ log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
+ None
+ }
+ }
+ }
+
+ /// Fetch metric data from a provided list of remotes concurrently.
+ /// The maximum number of concurrent connections is determined by
+ /// `max_concurrent_connections` in the [`CollectionSettings`]
+ /// instance in `self`.
+ async fn fetch_remotes(
+ &mut self,
+ remote_config: &SectionConfigData<Remote>,
+ remotes_to_fetch: &[String],
+ ) {
+ let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS));
+ let mut handles = Vec::new();
+
+ for remote_name in remotes_to_fetch {
+ let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+
+ // unwrap is okay here, acquire_* will only fail if `close` has been
+ // called on the semaphore.
+ let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
+
+ if let Some(remote) = remote_config.get(remote_name).cloned() {
+ log::debug!("fetching remote '{}'", remote.id);
+ let handle = tokio::spawn(Self::fetch_single_remote(
+ remote,
+ start_time,
+ self.metric_data_tx.clone(),
+ permit,
+ ));
+
+ handles.push((remote_name.clone(), handle));
+ }
+ }
+
+ for (remote_name, handle) in handles {
+ let res = handle.await;
+
+ match res {
+ Ok(Ok(ts)) => {
+ self.most_recent_timestamps
+ .insert(remote_name.to_string(), ts);
+ }
+ Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
+ Err(err) => {
+ log::error!(
+ "join error for metric collection task for remote {remote_name}: {err}"
+ )
+ }
+ }
+ }
+ }
+
+ /// Fetch a single remote.
+ #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")]
+ async fn fetch_single_remote(
+ remote: Remote,
+ start_time: i64,
+ sender: Sender<RrdStoreRequest>,
+ _permit: OwnedSemaphorePermit,
+ ) -> Result<i64, Error> {
+ let most_recent_timestamp = match remote.ty {
+ RemoteType::Pve => {
+ let client = connection::make_pve_client(&remote)?;
+ let metrics = client
+ .cluster_metrics_export(Some(true), Some(false), Some(start_time))
+ .await?;
+
+ let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+ sender
+ .send(RrdStoreRequest::Pve {
+ remote: remote.id.clone(),
+ metrics,
+ })
+ .await?;
+
+ most_recent
+ }
+ RemoteType::Pbs => {
+ let client = connection::make_pbs_client(&remote)?;
+ let metrics = client.metrics(Some(true), Some(start_time)).await?;
+
+ let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+ sender
+ .send(RrdStoreRequest::Pbs {
+ remote: remote.id.clone(),
+ metrics,
+ })
+ .await?;
+
+ most_recent
+ }
+ };
+
+ Ok(most_recent_timestamp)
+ }
+}
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 9c75f39c..9b203615 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -1,20 +1,17 @@
-use std::collections::HashMap;
use std::pin::pin;
+use std::sync::OnceLock;
-use anyhow::Error;
+use anyhow::{bail, Error};
use tokio::sync::mpsc::{self, Sender};
-use pdm_api_types::remotes::RemoteType;
-
-use crate::{connection, task_utils};
-
+mod collection_task;
pub mod rrd_cache;
mod rrd_task;
pub mod top_entities;
-use rrd_task::RrdStoreRequest;
+use collection_task::{ControlMsg, MetricCollectionTask};
-const COLLECTION_INTERVAL: u64 = 60;
+static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
/// Initialize the RRD cache
pub fn init() -> Result<(), Error> {
@@ -24,89 +21,55 @@ pub fn init() -> Result<(), Error> {
}
/// Start the metric collection task.
-pub fn start_task() {
- let (tx, rx) = mpsc::channel(128);
+pub fn start_task() -> Result<(), Error> {
+ let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
- tokio::spawn(async move {
- let task_scheduler = pin!(metric_collection_task(tx));
- let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
- });
-
- tokio::spawn(async move {
- let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
- let abort_future = pin!(proxmox_daemon::shutdown_future());
- futures::future::select(task_scheduler, abort_future).await;
- });
-}
-
-async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
- let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
-
- loop {
- let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
- tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
- let remotes = match pdm_config::remotes::config() {
- Ok((remotes, _)) => remotes,
- Err(e) => {
- log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
- continue;
- }
- };
-
- for (remote_name, remote) in remotes {
- let start_time = *most_recent_timestamps.get(&remote_name).unwrap_or(&0);
- let remote_name_clone = remote_name.clone();
-
- let res = async {
- let most_recent_timestamp = match remote.ty {
- RemoteType::Pve => {
- let client = connection::make_pve_client(&remote)?;
- let metrics = client
- .cluster_metrics_export(Some(true), Some(false), Some(start_time))
- .await?;
-
- let most_recent =
- metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
- sender
- .send(RrdStoreRequest::Pve {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
-
- Ok::<i64, Error>(most_recent)
- }
- RemoteType::Pbs => {
- let client = connection::make_pbs_client(&remote)?;
- let metrics = client.metrics(Some(true), Some(start_time)).await?;
-
- let most_recent =
- metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
- sender
- .send(RrdStoreRequest::Pbs {
- remote: remote_name_clone,
- metrics,
- })
- .await?;
-
- Ok::<i64, Error>(most_recent)
- }
- }?;
-
- Ok::<i64, Error>(most_recent_timestamp)
- }
- .await;
-
- match res {
- Ok(ts) => {
- most_recent_timestamps.insert(remote_name.to_string(), ts);
- }
- Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
- }
- }
+ let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
+ if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
+ bail!("control message sender alread set");
}
+
+ tokio::spawn(async move {
+ let metric_collection_task_future = pin!(async move {
+ match MetricCollectionTask::new(metric_data_tx, trigger_collection_rx) {
+ Ok(mut task) => task.run().await,
+ Err(err) => log::error!("could not start metric collection task: {err}"),
+ }
+ });
+
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(metric_collection_task_future, abort_future).await;
+ });
+
+ tokio::spawn(async move {
+ let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(rrd_task_future, abort_future).await;
+ });
+
+ Ok(())
+}
+
+/// Schedule metric collection for a given remote as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection_for_remote(remote: String) -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender.send(ControlMsg::CollectSingleRemote(remote)).await?;
+ }
+
+ Ok(())
+}
+
+/// Schedule metric collection for all remotes as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection() -> Result<(), Error> {
+ if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+ sender.send(ControlMsg::CollectAllRemotes).await?;
+ }
+
+ Ok(())
}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-14 13:30 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-14 13:31 [pdm-devel] [PATCH proxmox-datacenter-manager v5 00/23] metric collection improvements (concurrency, API, CLI) Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 01/23] metric collection: split top_entities split into separate module Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 02/23] metric collection: save metric data to RRD in separate task Lukas Wagner
2025-08-14 13:31 ` Lukas Wagner [this message]
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 04/23] metric collection: persist state after metric collection Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 05/23] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 06/23] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 07/23] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 08/23] metric collection: add test for fetch_overdue Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 09/23] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 10/23] metric collection: add test for rrd task Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 11/23] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 12/23] metric collection: record remote response time in metric database Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 13/23] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 14/23] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 15/23] api: add endpoint to trigger metric collection Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 16/23] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 17/23] api: add api for querying metric collection RRD data Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 18/23] api: metric-collection: add status endpoint Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 19/23] pdm-client: add metric collection API methods Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 20/23] cli: add commands for metric-collection trigger and status Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 21/23] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 22/23] metric collection: skip missed timer ticks Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 23/23] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-08-21 9:39 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 00/23] metric collection improvements (concurrency, API, CLI) Dominik Csapak
2025-08-21 9:55 ` [pdm-devel] superseded: " Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250814133142.386650-4-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox