public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v3 05/26] metric collection: rework metric poll task
Date: Wed, 16 Apr 2025 14:56:21 +0200	[thread overview]
Message-ID: <20250416125642.291552-6-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250416125642.291552-1-l.wagner@proxmox.com>

Metric collection is mostly limited by network latency, not disk IO or
CPU. This means we should be able get significant speedups by polling
remotes concurrently by spawning tokio workers.

To avoid load/IO spikes, we limit the number of concurrent connections
using a semaphore.

Each concurrent task which fetches a single remote waits a random amount
of time before actually connecting to the remote. The upper and lower
bounds for this random delay can be configured in the metric collection
settings. The main aim of this mechanism is to reduce load spikes.

Furthermore, each time when the main collection interval timer
fires, a random delay is introduced before actually starting the
collection process.
This is useful due to how we set up the timer; we configure
it to figure at aligned points in time. For instance, if the
collection interval is set to 60s, the timer fires at
minute boundaries. Using this random offset, we can avoid
triggering at the same time as other timers (cron, systemd).

Furthermore, we add a mechanism to trigger metric collection manually.
This is achieve by using `tokio::select!` on both, the timer's `tick`
method and the `recv` method of an `mpsc` which is used to send control
messages to the metric collection task.

For better code structure, the collection task is split into a separate
module.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---

Notes:
    Changes since v1:
      - get_settings_or_default function: use unwrap_or_default instead
        of match
      - Add `rand` crate dependency to d/control
      - Refactor sleep helpers, consolidate into a single
        parameterised funtion
      - Get rid of .unwrap when spawning up the metric collection task
      - Let trigger_metric_collection_for_remote take a String instead of
        &str
    
    Changes since v2:
      - Use hardcoded default for max-concurrency for now. This
      might be replaced by some global knob, affecting all background tasks
      - Drop interval delays/jitter for now, this also might be affected
      by a global request scheduler

 server/src/bin/proxmox-datacenter-api/main.rs |   2 +-
 .../src/metric_collection/collection_task.rs  | 264 ++++++++++++++++++
 server/src/metric_collection/mod.rs           | 145 ++++------
 3 files changed, 319 insertions(+), 92 deletions(-)
 create mode 100644 server/src/metric_collection/collection_task.rs

diff --git a/server/src/bin/proxmox-datacenter-api/main.rs b/server/src/bin/proxmox-datacenter-api/main.rs
index 49499980..f330a329 100644
--- a/server/src/bin/proxmox-datacenter-api/main.rs
+++ b/server/src/bin/proxmox-datacenter-api/main.rs
@@ -289,7 +289,7 @@ async fn run(debug: bool) -> Result<(), Error> {
     });
 
     start_task_scheduler();
-    metric_collection::start_task();
+    metric_collection::start_task()?;
     tasks::remote_node_mapping::start_task();
     resource_cache::start_task();
 
diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
new file mode 100644
index 00000000..093f2692
--- /dev/null
+++ b/server/src/metric_collection/collection_task.rs
@@ -0,0 +1,264 @@
+use std::{collections::HashMap, sync::Arc, time::Duration};
+
+use anyhow::Error;
+use tokio::{
+    sync::{
+        mpsc::{Receiver, Sender},
+        OwnedSemaphorePermit, Semaphore,
+    },
+    time::Interval,
+};
+
+use proxmox_section_config::typed::SectionConfigData;
+
+use pdm_api_types::{
+    remotes::{Remote, RemoteType},
+    CollectionSettings,
+};
+use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
+
+use crate::{connection, task_utils};
+
+use super::rrd_task::RrdStoreRequest;
+
+pub const MAX_CONCURRENT_CONNECTIONS: usize = 20;
+
+/// Control messages for the metric collection task.
+pub(super) enum ControlMsg {
+    CollectSingleRemote(String),
+    CollectAllRemotes,
+}
+
+/// Task which periodically collects metrics from all remotes and stores
+/// them in the local metrics database.
+pub(super) struct MetricCollectionTask {
+    most_recent_timestamps: HashMap<String, i64>,
+    settings: CollectionSettings,
+    metric_data_tx: Sender<RrdStoreRequest>,
+    control_message_rx: Receiver<ControlMsg>,
+}
+
+impl MetricCollectionTask {
+    /// Create a new metric collection task.
+    pub(super) fn new(
+        metric_data_tx: Sender<RrdStoreRequest>,
+        control_message_rx: Receiver<ControlMsg>,
+    ) -> Result<Self, Error> {
+        let settings = Self::get_settings_or_default();
+
+        Ok(Self {
+            most_recent_timestamps: HashMap::new(),
+            settings,
+            metric_data_tx,
+            control_message_rx,
+        })
+    }
+
+    /// Run the metric collection task.
+    ///
+    /// This function never returns.
+    #[tracing::instrument(skip_all, name = "metric_collection_task")]
+    pub(super) async fn run(&mut self) {
+        let mut timer = Self::setup_timer(self.settings.collection_interval_or_default());
+
+        log::debug!(
+            "metric collection starting up. Collection interval set to {} seconds.",
+            self.settings.collection_interval_or_default()
+        );
+
+        loop {
+            let old_settings = self.settings.clone();
+            tokio::select! {
+                _ = timer.tick() => {
+                    // Reload settings in case they have changed in the meanwhile
+                    self.settings = Self::get_settings_or_default();
+
+                    log::debug!("starting metric collection from all remotes - triggered by timer");
+
+                    if let Some(remotes) = Self::load_remote_config() {
+                        let to_fetch = remotes.order.as_slice();
+                        self.fetch_remotes(&remotes, to_fetch).await;
+                    }
+                }
+
+                val = self.control_message_rx.recv() => {
+                    // Reload settings in case they have changed in the meanwhile
+                    self.settings = Self::get_settings_or_default();
+                    match val {
+                        Some(ControlMsg::CollectSingleRemote(remote)) => {
+                            if let Some(remotes) = Self::load_remote_config() {
+                                log::debug!("starting metric collection for remote '{remote}'- triggered by control message");
+                                self.fetch_remotes(&remotes, &[remote]).await;
+                            }
+                        }
+                        Some(ControlMsg::CollectAllRemotes) => {
+                            if let Some(remotes) = Self::load_remote_config() {
+                                log::debug!("starting metric collection from all remotes - triggered by control message");
+                                self.fetch_remotes(&remotes, &remotes.order).await;
+                            }
+                        }
+                        _ => {},
+                    }
+                }
+            }
+
+            let interval = self.settings.collection_interval_or_default();
+
+            if old_settings.collection_interval_or_default() != interval {
+                log::info!(
+                    "metric collection interval changed to {} seconds, reloading timer",
+                    interval
+                );
+                timer = Self::setup_timer(interval);
+            }
+        }
+    }
+
+    fn get_settings_or_default() -> CollectionSettings {
+        // We want to fall back to defaults if
+        //   - the config file does not exist (no error should be logged)
+        //   - the section type is wrong or if the config failed to parse (log an error in this
+        //   case)
+
+        fn get_settings_impl() -> Result<CollectionSettings, Error> {
+            let (config, _) = pdm_config::metric_collection::config()?;
+
+            let all_sections: Vec<CollectionSettings> =
+                config.convert_to_typed_array(COLLECTION_SETTINGS_TYPE)?;
+
+            for section in all_sections {
+                if section.id == "default" {
+                    return Ok(section);
+                }
+            }
+
+            Ok(CollectionSettings::new("default"))
+        }
+
+        get_settings_impl().unwrap_or_else(|err| {
+            log::error!(
+                "could not read metric collection settings: {err} - falling back to default config"
+            );
+            CollectionSettings::new("default")
+        })
+    }
+
+    /// Set up a [`tokio::time::Interval`] instance with the provided interval.
+    /// The timer will be aligned, e.g. an interval of `60` will let the timer
+    /// fire at minute boundaries.
+    fn setup_timer(interval: u64) -> Interval {
+        let mut timer = tokio::time::interval(Duration::from_secs(interval));
+        let first_run = task_utils::next_aligned_instant(interval).into();
+        timer.reset_at(first_run);
+
+        timer
+    }
+
+    /// Convenience helper to load `remote.cfg`, logging the error
+    /// and returning `None` if the config could not be read.
+    fn load_remote_config() -> Option<SectionConfigData<Remote>> {
+        match pdm_config::remotes::config() {
+            Ok((remotes, _)) => Some(remotes),
+            Err(e) => {
+                log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
+                None
+            }
+        }
+    }
+
+    /// Fetch metric data from a provided list of remotes concurrently.
+    /// The maximum number of concurrent connections is determined by
+    /// `max_concurrent_connections` in the [`CollectionSettings`]
+    /// instance in `self`.
+    async fn fetch_remotes(
+        &mut self,
+        remote_config: &SectionConfigData<Remote>,
+        remotes_to_fetch: &[String],
+    ) {
+        let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_CONNECTIONS));
+        let mut handles = Vec::new();
+
+        for remote_name in remotes_to_fetch {
+            let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+
+            // unwrap is okay here, acquire_* will only fail if `close` has been
+            // called on the semaphore.
+            let permit = Arc::clone(&semaphore).acquire_owned().await.unwrap();
+
+            if let Some(remote) = remote_config.get(remote_name).cloned() {
+                log::debug!("fetching remote '{}'", remote.id);
+                let handle = tokio::spawn(Self::fetch_single_remote(
+                    remote,
+                    start_time,
+                    self.metric_data_tx.clone(),
+                    permit,
+                ));
+
+                handles.push((remote_name.clone(), handle));
+            }
+        }
+
+        for (remote_name, handle) in handles {
+            let res = handle.await;
+
+            match res {
+                Ok(Ok(ts)) => {
+                    self.most_recent_timestamps
+                        .insert(remote_name.to_string(), ts);
+                }
+                Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
+                Err(err) => {
+                    log::error!(
+                        "join error for metric collection task for remote {remote_name}: {err}"
+                    )
+                }
+            }
+        }
+    }
+
+    /// Fetch a single remote.
+    #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")]
+    async fn fetch_single_remote(
+        remote: Remote,
+        start_time: i64,
+        sender: Sender<RrdStoreRequest>,
+        _permit: OwnedSemaphorePermit,
+    ) -> Result<i64, Error> {
+        let most_recent_timestamp = match remote.ty {
+            RemoteType::Pve => {
+                let client = connection::make_pve_client(&remote)?;
+                let metrics = client
+                    .cluster_metrics_export(Some(true), Some(false), Some(start_time))
+                    .await?;
+
+                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+                sender
+                    .send(RrdStoreRequest::Pve {
+                        remote: remote.id.clone(),
+                        metrics,
+                    })
+                    .await?;
+
+                most_recent
+            }
+            RemoteType::Pbs => {
+                let client = connection::make_pbs_client(&remote)?;
+                let metrics = client.metrics(Some(true), Some(start_time)).await?;
+
+                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+
+                sender
+                    .send(RrdStoreRequest::Pbs {
+                        remote: remote.id.clone(),
+                        metrics,
+                    })
+                    .await?;
+
+                most_recent
+            }
+        };
+
+        Ok(most_recent_timestamp)
+    }
+}
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 06ade5f0..9b203615 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -1,20 +1,17 @@
-use std::collections::HashMap;
 use std::pin::pin;
+use std::sync::OnceLock;
 
-use anyhow::Error;
+use anyhow::{bail, Error};
 use tokio::sync::mpsc::{self, Sender};
 
-use pdm_api_types::remotes::RemoteType;
-
-use crate::{connection, task_utils};
-
+mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
 pub mod top_entities;
 
-use rrd_task::RrdStoreRequest;
+use collection_task::{ControlMsg, MetricCollectionTask};
 
-const COLLECTION_INTERVAL: u64 = 60;
+static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
 
 /// Initialize the RRD cache
 pub fn init() -> Result<(), Error> {
@@ -24,89 +21,55 @@ pub fn init() -> Result<(), Error> {
 }
 
 /// Start the metric collection task.
-pub fn start_task() {
-    let (tx, rx) = mpsc::channel(128);
+pub fn start_task() -> Result<(), Error> {
+    let (metric_data_tx, metric_data_rx) = mpsc::channel(128);
 
-    tokio::spawn(async move {
-        let task_scheduler = pin!(metric_collection_task(tx));
-        let abort_future = pin!(proxmox_daemon::shutdown_future());
-        futures::future::select(task_scheduler, abort_future).await;
-    });
-
-    tokio::spawn(async move {
-        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
-        let abort_future = pin!(proxmox_daemon::shutdown_future());
-        futures::future::select(task_scheduler, abort_future).await;
-    });
-}
-
-async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
-    let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
-
-    loop {
-        let delay_target = task_utils::next_aligned_instant(COLLECTION_INTERVAL);
-        tokio::time::sleep_until(tokio::time::Instant::from_std(delay_target)).await;
-
-        let remotes = match pdm_config::remotes::config() {
-            Ok((remotes, _)) => remotes,
-            Err(e) => {
-                log::error!("failed to collect metrics, could not read remotes.cfg: {e}");
-                continue;
-            }
-        };
-
-        for (remote_name, remote) in &remotes.sections {
-            let start_time = *most_recent_timestamps.get(remote_name).unwrap_or(&0);
-            let remote_name_clone = remote_name.clone();
-
-            let res = async {
-                let most_recent_timestamp = match remote.ty {
-                    RemoteType::Pve => {
-                        let client = connection::make_pve_client(remote)?;
-                        let metrics = client
-                            .cluster_metrics_export(Some(true), Some(false), Some(start_time))
-                            .await?;
-
-                        let most_recent =
-                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
-                        sender
-                            .send(RrdStoreRequest::Pve {
-                                remote: remote_name_clone,
-                                metrics,
-                            })
-                            .await?;
-
-                        Ok::<i64, Error>(most_recent)
-                    }
-                    RemoteType::Pbs => {
-                        let client = connection::make_pbs_client(remote)?;
-                        let metrics = client.metrics(Some(true), Some(start_time)).await?;
-
-                        let most_recent =
-                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
-                        sender
-                            .send(RrdStoreRequest::Pbs {
-                                remote: remote_name_clone,
-                                metrics,
-                            })
-                            .await?;
-
-                        Ok::<i64, Error>(most_recent)
-                    }
-                }?;
-
-                Ok::<i64, Error>(most_recent_timestamp)
-            }
-            .await;
-
-            match res {
-                Ok(ts) => {
-                    most_recent_timestamps.insert(remote_name.to_string(), ts);
-                }
-                Err(err) => log::error!("failed to collect metrics for {remote_name}: {err}"),
-            }
-        }
+    let (trigger_collection_tx, trigger_collection_rx) = mpsc::channel(128);
+    if CONTROL_MESSAGE_TX.set(trigger_collection_tx).is_err() {
+        bail!("control message sender alread set");
     }
+
+    tokio::spawn(async move {
+        let metric_collection_task_future = pin!(async move {
+            match MetricCollectionTask::new(metric_data_tx, trigger_collection_rx) {
+                Ok(mut task) => task.run().await,
+                Err(err) => log::error!("could not start metric collection task: {err}"),
+            }
+        });
+
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(metric_collection_task_future, abort_future).await;
+    });
+
+    tokio::spawn(async move {
+        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(rrd_task_future, abort_future).await;
+    });
+
+    Ok(())
+}
+
+/// Schedule metric collection for a given remote as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection_for_remote(remote: String) -> Result<(), Error> {
+    if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+        sender.send(ControlMsg::CollectSingleRemote(remote)).await?;
+    }
+
+    Ok(())
+}
+
+/// Schedule metric collection for all remotes as soon as possible.
+///
+/// Has no effect if the tx end of the channel has not been initialized yet.
+/// Returns an error if the mpsc channel has been closed already.
+pub async fn trigger_metric_collection() -> Result<(), Error> {
+    if let Some(sender) = CONTROL_MESSAGE_TX.get() {
+        sender.send(ControlMsg::CollectAllRemotes).await?;
+    }
+
+    Ok(())
 }
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-04-16 12:57 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-04-16 12:56 [pdm-devel] [PATCH proxmox-datacenter-manager v3 00/26] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 01/26] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 02/26] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 03/26] metric collection: split top_entities split into separate module Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 04/26] metric collection: save metric data to RRD in separate task Lukas Wagner
2025-04-16 12:56 ` Lukas Wagner [this message]
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 06/26] metric collection: persist state after metric collection Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 07/26] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 08/26] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 09/26] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 10/26] metric collection: add test for fetch_overdue Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 11/26] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 12/26] metric collection: add test for rrd task Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 13/26] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 14/26] metric collection: record remote response time in metric database Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 15/26] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 16/26] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 17/26] api: add endpoint for updating metric collection settings Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 18/26] api: add endpoint to trigger metric collection Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 19/26] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 20/26] api: add api for querying metric collection RRD data Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 21/26] api: metric-collection: add status endpoint Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 22/26] pdm-client: add metric collection API methods Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 23/26] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 24/26] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 25/26] metric collection: skip missed timer ticks Lukas Wagner
2025-04-16 12:56 ` [pdm-devel] [PATCH proxmox-datacenter-manager v3 26/26] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-05-12 13:38 ` [pdm-devel] superseded: [PATCH proxmox-datacenter-manager v3 00/26] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250416125642.291552-6-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal