From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 06/28] metric collection: save metric data to RRD in separate task
Date: Fri, 14 Feb 2025 14:06:31 +0100 [thread overview]
Message-ID: <20250214130653.283012-7-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com>
This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.
For better code separation, this also splits out the new task into
a new submodule.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
server/src/metric_collection/mod.rs | 102 +++++++----------------
server/src/metric_collection/rrd_task.rs | 96 +++++++++++++++++++++
2 files changed, 128 insertions(+), 70 deletions(-)
create mode 100644 server/src/metric_collection/rrd_task.rs
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 5540f937..06ade5f0 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
use std::pin::pin;
use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
use crate::{connection, task_utils};
pub mod rrd_cache;
+mod rrd_task;
pub mod top_entities;
+use rrd_task::RrdStoreRequest;
+
const COLLECTION_INTERVAL: u64 = 60;
/// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
/// Start the metric collection task.
pub fn start_task() {
+ let (tx, rx) = mpsc::channel(128);
+
+ tokio::spawn(async move {
+ let task_scheduler = pin!(metric_collection_task(tx));
+ let abort_future = pin!(proxmox_daemon::shutdown_future());
+ futures::future::select(task_scheduler, abort_future).await;
+ });
+
tokio::spawn(async move {
- let task_scheduler = pin!(metric_collection_task());
+ let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
let abort_future = pin!(proxmox_daemon::shutdown_future());
futures::future::select(task_scheduler, abort_future).await;
});
}
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
.cluster_metrics_export(Some(true), Some(false), Some(start_time))
.await?;
- //// Involves some blocking file IO
- tokio::task::spawn_blocking(move || {
- let mut most_recent_timestamp = 0;
+ let most_recent =
+ metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
- for data_point in metrics.data {
- most_recent_timestamp =
- most_recent_timestamp.max(data_point.timestamp);
- store_metric_pve(&remote_name_clone, &data_point);
- }
+ sender
+ .send(RrdStoreRequest::Pve {
+ remote: remote_name_clone,
+ metrics,
+ })
+ .await?;
- most_recent_timestamp
- })
- .await
+ Ok::<i64, Error>(most_recent)
}
RemoteType::Pbs => {
let client = connection::make_pbs_client(remote)?;
let metrics = client.metrics(Some(true), Some(start_time)).await?;
- // Involves some blocking file IO
- tokio::task::spawn_blocking(move || {
- let mut most_recent_timestamp = 0;
+ let most_recent =
+ metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
- for data_point in metrics.data {
- most_recent_timestamp =
- most_recent_timestamp.max(data_point.timestamp);
- store_metric_pbs(&remote_name_clone, &data_point);
- }
+ sender
+ .send(RrdStoreRequest::Pbs {
+ remote: remote_name_clone,
+ metrics,
+ })
+ .await?;
- most_recent_timestamp
- })
- .await
+ Ok::<i64, Error>(most_recent)
}
}?;
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
}
}
}
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
- let name = format!(
- "pve/{remote_name}/{id}/{metric}",
- id = data_point.id,
- metric = data_point.metric,
- );
-
- let data_source_type = match data_point.ty {
- ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
- ClusterMetricsDataType::Counter => DataSourceType::Counter,
- ClusterMetricsDataType::Derive => DataSourceType::Derive,
- };
-
- rrd_cache::update_value(
- &name,
- data_point.value,
- data_point.timestamp,
- data_source_type,
- );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
- let name = format!(
- "pbs/{remote_name}/{id}/{metric}",
- id = data_point.id,
- metric = data_point.metric,
- );
-
- let data_source_type = match data_point.ty {
- MetricDataType::Gauge => DataSourceType::Gauge,
- MetricDataType::Counter => DataSourceType::Counter,
- MetricDataType::Derive => DataSourceType::Derive,
- };
-
- rrd_cache::update_value(
- &name,
- data_point.value,
- data_point.timestamp,
- data_source_type,
- );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 00000000..a72945df
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,96 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store request for the RRD task.
+pub(super) enum RrdStoreRequest {
+ /// Store PVE metrics.
+ Pve {
+ /// Remote name.
+ remote: String,
+ /// Metric data.
+ metrics: ClusterMetrics,
+ },
+ /// Store PBS metrics.
+ Pbs {
+ /// Remote name.
+ remote: String,
+ /// Metric data.
+ metrics: Metrics,
+ },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+ mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+ while let Some(msg) = receiver.recv().await {
+ // Involves some blocking file IO
+ let res = tokio::task::spawn_blocking(move || match msg {
+ RrdStoreRequest::Pve { remote, metrics } => {
+ for data_point in metrics.data {
+ store_metric_pve(&remote, &data_point);
+ }
+ }
+ RrdStoreRequest::Pbs { remote, metrics } => {
+ for data_point in metrics.data {
+ store_metric_pbs(&remote, &data_point);
+ }
+ }
+ })
+ .await;
+
+ if let Err(err) = res {
+ log::error!("error in rrd task when attempting to save metrics: {err}");
+ }
+ }
+
+ Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+ let name = format!(
+ "pve/{remote_name}/{id}/{metric}",
+ id = data_point.id,
+ metric = data_point.metric,
+ );
+
+ let data_source_type = match data_point.ty {
+ ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+ ClusterMetricsDataType::Counter => DataSourceType::Counter,
+ ClusterMetricsDataType::Derive => DataSourceType::Derive,
+ };
+
+ rrd_cache::update_value(
+ &name,
+ data_point.value,
+ data_point.timestamp,
+ data_source_type,
+ );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+ let name = format!(
+ "pbs/{remote_name}/{id}/{metric}",
+ id = data_point.id,
+ metric = data_point.metric,
+ );
+
+ let data_source_type = match data_point.ty {
+ MetricDataType::Gauge => DataSourceType::Gauge,
+ MetricDataType::Counter => DataSourceType::Counter,
+ MetricDataType::Derive => DataSourceType::Derive,
+ };
+
+ rrd_cache::update_value(
+ &name,
+ data_point.value,
+ data_point.timestamp,
+ data_source_type,
+ );
+}
--
2.39.5
_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel
next prev parent reply other threads:[~2025-02-14 13:07 UTC|newest]
Thread overview: 34+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-02-14 13:06 [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 01/28] test support: add NamedTempFile helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 02/28] test support: add NamedTempDir helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 03/28] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-02-18 15:26 ` Wolfgang Bumiller
2025-02-18 15:31 ` Stefan Hanreich
2025-02-21 8:27 ` Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 04/28] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 05/28] metric collection: split top_entities split into separate module Lukas Wagner
2025-02-14 13:06 ` Lukas Wagner [this message]
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 07/28] metric collection: rework metric poll task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 08/28] metric collection: persist state after metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 09/28] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 10/28] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 11/28] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 12/28] metric collection: add test for fetch_overdue Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 14/28] metric collection: add test for rrd task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 16/28] metric collection: record remote response time in metric database Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 17/28] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 18/28] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 19/28] api: add endpoint for updating metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 20/28] api: add endpoint to trigger metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 21/28] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 22/28] api: add api for querying metric collection RRD data Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 23/28] api: metric-collection: add status endpoint Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 24/28] pdm-client: add metric collection API methods Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 25/28] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 26/28] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 27/28] metric collection: skip missed timer ticks Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-02-21 13:19 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Maximiliano Sandoval
2025-03-14 14:10 ` Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250214130653.283012-7-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal