From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v5 02/23] metric collection: save metric data to RRD in separate task
Date: Thu, 14 Aug 2025 15:31:21 +0200 [thread overview]
Message-ID: <20250814133142.386650-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250814133142.386650-1-l.wagner@proxmox.com>
This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.
For better code separation, this also splits out the new task into
a new submodule.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
server/src/metric_collection/mod.rs | 102 +++++++----------------
server/src/metric_collection/rrd_task.rs | 96 +++++++++++++++++++++
2 files changed, 128 insertions(+), 70 deletions(-)
create mode 100644 server/src/metric_collection/rrd_task.rs
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 588f7ad5..9c75f39c 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
use std::pin::pin;
use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
use crate::{connection, task_utils};
pub mod rrd_cache;
+mod rrd_task;
pub mod top_entities;
+use rrd_task::RrdStoreRequest;
+
const COLLECTION_INTERVAL: u64 = 60;
/// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
/// Start the metric collection task.
pub fn start_task() {
+ let (tx, rx) = mpsc::channel(128);
+
tokio::spawn(async move {
- let task_scheduler = pin!(metric_collection_task());
+ let task_scheduler = pin!(metric_collection_task(tx));
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
+ tokio::spawn(async move {
+ let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
let abort_future = pin!(proxmox_daemon::shutdown_future());
futures::future::select(task_scheduler, abort_future).await;
});
}
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
.cluster_metrics_export(Some(true), Some(false), Some(start_time))
.await?;
- //// Involves some blocking file IO
- tokio::task::spawn_blocking(move || {
- let mut most_recent_timestamp = 0;
+ let most_recent =
+ metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
- for data_point in metrics.data {
- most_recent_timestamp =
- most_recent_timestamp.max(data_point.timestamp);
- store_metric_pve(&remote_name_clone, &data_point);
- }
+ sender
+ .send(RrdStoreRequest::Pve {
+ remote: remote_name_clone,
+ metrics,
+ })
+ .await?;
- most_recent_timestamp
- })
- .await
+ Ok::<i64, Error>(most_recent)
}
RemoteType::Pbs => {
let client = connection::make_pbs_client(&remote)?;
let metrics = client.metrics(Some(true), Some(start_time)).await?;
- // Involves some blocking file IO
- tokio::task::spawn_blocking(move || {
- let mut most_recent_timestamp = 0;
+ let most_recent =
+ metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
- for data_point in metrics.data {
- most_recent_timestamp =
- most_recent_timestamp.max(data_point.timestamp);
- store_metric_pbs(&remote_name_clone, &data_point);
- }
+ sender
+ .send(RrdStoreRequest::Pbs {
+ remote: remote_name_clone,
+ metrics,
+ })
+ .await?;
- most_recent_timestamp
- })
- .await
+ Ok::<i64, Error>(most_recent)
}
}?;
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
}
}
}
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
- let name = format!(
- "pve/{remote_name}/{id}/{metric}",
- id = data_point.id,
- metric = data_point.metric,
- );
-
- let data_source_type = match data_point.ty {
- ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
- ClusterMetricsDataType::Counter => DataSourceType::Counter,
- ClusterMetricsDataType::Derive => DataSourceType::Derive,
- };
-
- rrd_cache::update_value(
- &name,
- data_point.value,
- data_point.timestamp,
- data_source_type,
- );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
- let name = format!(
- "pbs/{remote_name}/{id}/{metric}",
- id = data_point.id,
- metric = data_point.metric,
- );
-
- let data_source_type = match data_point.ty {
- MetricDataType::Gauge => DataSourceType::Gauge,
- MetricDataType::Counter => DataSourceType::Counter,
- MetricDataType::Derive => DataSourceType::Derive,
- };
-
- rrd_cache::update_value(
- &name,
- data_point.value,
- data_point.timestamp,
- data_source_type,
- );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 00000000..a72945df
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,96 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store request for the RRD task.
+pub(super) enum RrdStoreRequest {
+ /// Store PVE metrics.
+ Pve {
+ /// Remote name.
+ remote: String,
+ /// Metric data.
+ metrics: ClusterMetrics,
+ },
+ /// Store PBS metrics.
+ Pbs {
+ /// Remote name.
+ remote: String,
+ /// Metric data.
+ metrics: Metrics,
+ },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+ mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+ while let Some(msg) = receiver.recv().await {
+ // Involves some blocking file IO
+ let res = tokio::task::spawn_blocking(move || match msg {
+ RrdStoreRequest::Pve { remote, metrics } => {
+ for data_point in metrics.data {
+ store_metric_pve(&remote, &data_point);
+ }
+ }
+ RrdStoreRequest::Pbs { remote, metrics } => {
+ for data_point in metrics.data {
+ store_metric_pbs(&remote, &data_point);
+ }
+ }
+ })
+ .await;
+
+ if let Err(err) = res {
+ log::error!("error in rrd task when attempting to save metrics: {err}");
+ }
+ }
+
+ Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+ let name = format!(
+ "pve/{remote_name}/{id}/{metric}",
+ id = data_point.id,
+ metric = data_point.metric,
+ );
+
+ let data_source_type = match data_point.ty {
+ ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+ ClusterMetricsDataType::Counter => DataSourceType::Counter,
+ ClusterMetricsDataType::Derive => DataSourceType::Derive,
+ };
+
+ rrd_cache::update_value(
+ &name,
+ data_point.value,
+ data_point.timestamp,
+ data_source_type,
+ );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+ let name = format!(
+ "pbs/{remote_name}/{id}/{metric}",
+ id = data_point.id,
+ metric = data_point.metric,
+ );
+
+ let data_source_type = match data_point.ty {
+ MetricDataType::Gauge => DataSourceType::Gauge,
+ MetricDataType::Counter => DataSourceType::Counter,
+ MetricDataType::Derive => DataSourceType::Derive,
+ };
+
+ rrd_cache::update_value(
+ &name,
+ data_point.value,
+ data_point.timestamp,
+ data_source_type,
+ );
+}
--
2.47.2
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-08-14 13:30 UTC|newest]
Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-08-14 13:31 [pdm-devel] [PATCH proxmox-datacenter-manager v5 00/23] metric collection improvements (concurrency, API, CLI) Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 01/23] metric collection: split top_entities split into separate module Lukas Wagner
2025-08-14 13:31 ` Lukas Wagner [this message]
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 03/23] metric collection: rework metric poll task Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 04/23] metric collection: persist state after metric collection Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 05/23] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 06/23] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 07/23] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 08/23] metric collection: add test for fetch_overdue Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 09/23] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 10/23] metric collection: add test for rrd task Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 11/23] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 12/23] metric collection: record remote response time in metric database Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 13/23] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 14/23] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 15/23] api: add endpoint to trigger metric collection Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 16/23] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 17/23] api: add api for querying metric collection RRD data Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 18/23] api: metric-collection: add status endpoint Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 19/23] pdm-client: add metric collection API methods Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 20/23] cli: add commands for metric-collection trigger and status Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 21/23] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 22/23] metric collection: skip missed timer ticks Lukas Wagner
2025-08-14 13:31 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 23/23] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-08-21 9:39 ` [pdm-devel] [PATCH proxmox-datacenter-manager v5 00/23] metric collection improvements (concurrency, API, CLI) Dominik Csapak
2025-08-21 9:55 ` [pdm-devel] superseded: " Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250814133142.386650-3-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox