From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pdm-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9])
	by lore.proxmox.com (Postfix) with ESMTPS id E0F0A1FF15E
	for <inbox@lore.proxmox.com>; Tue, 11 Feb 2025 13:06:04 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 05C8D2A89C;
	Tue, 11 Feb 2025 13:05:58 +0100 (CET)
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Date: Tue, 11 Feb 2025 13:05:22 +0100
Message-Id: <20250211120541.163621-7-l.wagner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250211120541.163621-1-l.wagner@proxmox.com>
References: <20250211120541.163621-1-l.wagner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.010 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric
 collection: save metric data to RRD in separate task
X-BeenThere: pdm-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Datacenter Manager development discussion
 <pdm-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/>
List-Post: <mailto:pdm-devel@lists.proxmox.com>
List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Datacenter Manager development discussion
 <pdm-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pdm-devel-bounces@lists.proxmox.com
Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com>

This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.

For better code separation, this also splits out the new task into
a new submodule.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/metric_collection/mod.rs      | 102 +++++++----------------
 server/src/metric_collection/rrd_task.rs |  92 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 70 deletions(-)
 create mode 100644 server/src/metric_collection/rrd_task.rs

diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 5540f93..06ade5f 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
 use std::pin::pin;
 
 use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
 
 use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
 
 use crate::{connection, task_utils};
 
 pub mod rrd_cache;
+mod rrd_task;
 pub mod top_entities;
 
+use rrd_task::RrdStoreRequest;
+
 const COLLECTION_INTERVAL: u64 = 60;
 
 /// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
 
 /// Start the metric collection task.
 pub fn start_task() {
+    let (tx, rx) = mpsc::channel(128);
+
+    tokio::spawn(async move {
+        let task_scheduler = pin!(metric_collection_task(tx));
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
     tokio::spawn(async move {
-        let task_scheduler = pin!(metric_collection_task());
+        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(task_scheduler, abort_future).await;
     });
 }
 
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
     let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
 
     loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
                             .cluster_metrics_export(Some(true), Some(false), Some(start_time))
                             .await?;
 
-                        //// Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pve(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pve {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                     RemoteType::Pbs => {
                         let client = connection::make_pbs_client(remote)?;
                         let metrics = client.metrics(Some(true), Some(start_time)).await?;
 
-                        // Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pbs(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pbs {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                 }?;
 
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
         }
     }
 }
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
-    let name = format!(
-        "pve/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
-        ClusterMetricsDataType::Counter => DataSourceType::Counter,
-        ClusterMetricsDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
-    let name = format!(
-        "pbs/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        MetricDataType::Gauge => DataSourceType::Gauge,
-        MetricDataType::Counter => DataSourceType::Counter,
-        MetricDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 0000000..fe19580
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,92 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store requrest for the RRD task.
+pub(super) enum RrdStoreRequest {
+    /// Store PVE metrics.
+    Pve {
+        /// Remote name
+        remote: String,
+        /// Metric data
+        metrics: ClusterMetrics,
+    },
+    /// Store PBS metrics.
+    Pbs {
+        /// Remote name
+        remote: String,
+        /// Metric data
+        metrics: Metrics,
+    },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+    mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+    while let Some(msg) = receiver.recv().await {
+        //// Involves some blocking file IO
+        tokio::task::spawn_blocking(move || match msg {
+            RrdStoreRequest::Pve { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pve(&remote, &data_point);
+                }
+            }
+            RrdStoreRequest::Pbs { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pbs(&remote, &data_point);
+                }
+            }
+        })
+        .await?;
+    }
+
+    Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+    let name = format!(
+        "pve/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+        ClusterMetricsDataType::Counter => DataSourceType::Counter,
+        ClusterMetricsDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+    let name = format!(
+        "pbs/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        MetricDataType::Gauge => DataSourceType::Gauge,
+        MetricDataType::Counter => DataSourceType::Counter,
+        MetricDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel