From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pdm-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9])
	by lore.proxmox.com (Postfix) with ESMTPS id 3F0DA1FF172
	for <inbox@lore.proxmox.com>; Wed, 16 Apr 2025 14:56:53 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 8EED736C46;
	Wed, 16 Apr 2025 14:56:51 +0200 (CEST)
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Date: Wed, 16 Apr 2025 14:56:20 +0200
Message-Id: <20250416125642.291552-5-l.wagner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250416125642.291552-1-l.wagner@proxmox.com>
References: <20250416125642.291552-1-l.wagner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL -0.986 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 KAM_MAILER                  2 Automated Mailer Tag Left in Email
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v3 04/26] metric
 collection: save metric data to RRD in separate task
X-BeenThere: pdm-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Datacenter Manager development discussion
 <pdm-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/>
List-Post: <mailto:pdm-devel@lists.proxmox.com>
List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Datacenter Manager development discussion
 <pdm-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pdm-devel-bounces@lists.proxmox.com
Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com>

This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.

For better code separation, this also splits out the new task into
a new submodule.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
 server/src/metric_collection/mod.rs      | 102 +++++++----------------
 server/src/metric_collection/rrd_task.rs |  96 +++++++++++++++++++++
 2 files changed, 128 insertions(+), 70 deletions(-)
 create mode 100644 server/src/metric_collection/rrd_task.rs

diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 5540f937..06ade5f0 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
 use std::pin::pin;
 
 use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
 
 use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
 
 use crate::{connection, task_utils};
 
 pub mod rrd_cache;
+mod rrd_task;
 pub mod top_entities;
 
+use rrd_task::RrdStoreRequest;
+
 const COLLECTION_INTERVAL: u64 = 60;
 
 /// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
 
 /// Start the metric collection task.
 pub fn start_task() {
+    let (tx, rx) = mpsc::channel(128);
+
     tokio::spawn(async move {
-        let task_scheduler = pin!(metric_collection_task());
+        let task_scheduler = pin!(metric_collection_task(tx));
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
+    tokio::spawn(async move {
+        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(task_scheduler, abort_future).await;
     });
 }
 
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
     let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
 
     loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
                             .cluster_metrics_export(Some(true), Some(false), Some(start_time))
                             .await?;
 
-                        //// Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pve(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pve {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                     RemoteType::Pbs => {
                         let client = connection::make_pbs_client(remote)?;
                         let metrics = client.metrics(Some(true), Some(start_time)).await?;
 
-                        // Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pbs(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pbs {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                 }?;
 
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
         }
     }
 }
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
-    let name = format!(
-        "pve/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
-        ClusterMetricsDataType::Counter => DataSourceType::Counter,
-        ClusterMetricsDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
-    let name = format!(
-        "pbs/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        MetricDataType::Gauge => DataSourceType::Gauge,
-        MetricDataType::Counter => DataSourceType::Counter,
-        MetricDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 00000000..a72945df
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,96 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store request for the RRD task.
+pub(super) enum RrdStoreRequest {
+    /// Store PVE metrics.
+    Pve {
+        /// Remote name.
+        remote: String,
+        /// Metric data.
+        metrics: ClusterMetrics,
+    },
+    /// Store PBS metrics.
+    Pbs {
+        /// Remote name.
+        remote: String,
+        /// Metric data.
+        metrics: Metrics,
+    },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+    mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+    while let Some(msg) = receiver.recv().await {
+        // Involves some blocking file IO
+        let res = tokio::task::spawn_blocking(move || match msg {
+            RrdStoreRequest::Pve { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pve(&remote, &data_point);
+                }
+            }
+            RrdStoreRequest::Pbs { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pbs(&remote, &data_point);
+                }
+            }
+        })
+        .await;
+
+        if let Err(err) = res {
+            log::error!("error in rrd task when attempting to save metrics: {err}");
+        }
+    }
+
+    Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+    let name = format!(
+        "pve/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+        ClusterMetricsDataType::Counter => DataSourceType::Counter,
+        ClusterMetricsDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+    let name = format!(
+        "pbs/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        MetricDataType::Gauge => DataSourceType::Gauge,
+        MetricDataType::Counter => DataSourceType::Counter,
+        MetricDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel