public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric collection: save metric data to RRD in separate task
Date: Tue, 11 Feb 2025 13:05:22 +0100	[thread overview]
Message-ID: <20250211120541.163621-7-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250211120541.163621-1-l.wagner@proxmox.com>

This is a preparation for concurrent metric collection. While the RRD
cache appears to be safe to concurrent access (it uses an RwLock to
protect the data), delegating all RRD writes to a separate tokio task
makes it IMO easier to reason about and understand. Furthermore, it
decouples the 'metric fetching' and 'metric
storing' parts, making it much easier to write tests for both parts
independently.

For better code separation, this also splits out the new task into
a new submodule.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/metric_collection/mod.rs      | 102 +++++++----------------
 server/src/metric_collection/rrd_task.rs |  92 ++++++++++++++++++++
 2 files changed, 124 insertions(+), 70 deletions(-)
 create mode 100644 server/src/metric_collection/rrd_task.rs

diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 5540f93..06ade5f 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -2,18 +2,18 @@ use std::collections::HashMap;
 use std::pin::pin;
 
 use anyhow::Error;
-
-use pbs_api_types::{MetricDataPoint, MetricDataType};
-use proxmox_rrd::rrd::DataSourceType;
+use tokio::sync::mpsc::{self, Sender};
 
 use pdm_api_types::remotes::RemoteType;
-use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType};
 
 use crate::{connection, task_utils};
 
 pub mod rrd_cache;
+mod rrd_task;
 pub mod top_entities;
 
+use rrd_task::RrdStoreRequest;
+
 const COLLECTION_INTERVAL: u64 = 60;
 
 /// Initialize the RRD cache
@@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> {
 
 /// Start the metric collection task.
 pub fn start_task() {
+    let (tx, rx) = mpsc::channel(128);
+
+    tokio::spawn(async move {
+        let task_scheduler = pin!(metric_collection_task(tx));
+        let abort_future = pin!(proxmox_daemon::shutdown_future());
+        futures::future::select(task_scheduler, abort_future).await;
+    });
+
     tokio::spawn(async move {
-        let task_scheduler = pin!(metric_collection_task());
+        let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(task_scheduler, abort_future).await;
     });
 }
 
-async fn metric_collection_task() -> Result<(), Error> {
+async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> {
     let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new();
 
     loop {
@@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> {
                             .cluster_metrics_export(Some(true), Some(false), Some(start_time))
                             .await?;
 
-                        //// Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pve(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pve {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                     RemoteType::Pbs => {
                         let client = connection::make_pbs_client(remote)?;
                         let metrics = client.metrics(Some(true), Some(start_time)).await?;
 
-                        // Involves some blocking file IO
-                        tokio::task::spawn_blocking(move || {
-                            let mut most_recent_timestamp = 0;
+                        let most_recent =
+                            metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
 
-                            for data_point in metrics.data {
-                                most_recent_timestamp =
-                                    most_recent_timestamp.max(data_point.timestamp);
-                                store_metric_pbs(&remote_name_clone, &data_point);
-                            }
+                        sender
+                            .send(RrdStoreRequest::Pbs {
+                                remote: remote_name_clone,
+                                metrics,
+                            })
+                            .await?;
 
-                            most_recent_timestamp
-                        })
-                        .await
+                        Ok::<i64, Error>(most_recent)
                     }
                 }?;
 
@@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> {
         }
     }
 }
-
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
-    let name = format!(
-        "pve/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
-        ClusterMetricsDataType::Counter => DataSourceType::Counter,
-        ClusterMetricsDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
-
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
-    let name = format!(
-        "pbs/{remote_name}/{id}/{metric}",
-        id = data_point.id,
-        metric = data_point.metric,
-    );
-
-    let data_source_type = match data_point.ty {
-        MetricDataType::Gauge => DataSourceType::Gauge,
-        MetricDataType::Counter => DataSourceType::Counter,
-        MetricDataType::Derive => DataSourceType::Derive,
-    };
-
-    rrd_cache::update_value(
-        &name,
-        data_point.value,
-        data_point.timestamp,
-        data_source_type,
-    );
-}
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
new file mode 100644
index 0000000..fe19580
--- /dev/null
+++ b/server/src/metric_collection/rrd_task.rs
@@ -0,0 +1,92 @@
+use anyhow::Error;
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use proxmox_rrd::rrd::DataSourceType;
+use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
+use tokio::sync::mpsc::Receiver;
+
+use super::rrd_cache;
+
+/// Store requrest for the RRD task.
+pub(super) enum RrdStoreRequest {
+    /// Store PVE metrics.
+    Pve {
+        /// Remote name
+        remote: String,
+        /// Metric data
+        metrics: ClusterMetrics,
+    },
+    /// Store PBS metrics.
+    Pbs {
+        /// Remote name
+        remote: String,
+        /// Metric data
+        metrics: Metrics,
+    },
+}
+
+/// Task which stores received metrics in the RRD. Metric data is fed into
+/// this task via a MPSC channel.
+pub(super) async fn store_in_rrd_task(
+    mut receiver: Receiver<RrdStoreRequest>,
+) -> Result<(), Error> {
+    while let Some(msg) = receiver.recv().await {
+        //// Involves some blocking file IO
+        tokio::task::spawn_blocking(move || match msg {
+            RrdStoreRequest::Pve { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pve(&remote, &data_point);
+                }
+            }
+            RrdStoreRequest::Pbs { remote, metrics } => {
+                for data_point in metrics.data {
+                    store_metric_pbs(&remote, &data_point);
+                }
+            }
+        })
+        .await?;
+    }
+
+    Ok(())
+}
+
+fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+    let name = format!(
+        "pve/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        ClusterMetricsDataType::Gauge => DataSourceType::Gauge,
+        ClusterMetricsDataType::Counter => DataSourceType::Counter,
+        ClusterMetricsDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
+
+fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+    let name = format!(
+        "pbs/{remote_name}/{id}/{metric}",
+        id = data_point.id,
+        metric = data_point.metric,
+    );
+
+    let data_source_type = match data_point.ty {
+        MetricDataType::Gauge => DataSourceType::Gauge,
+        MetricDataType::Counter => DataSourceType::Counter,
+        MetricDataType::Derive => DataSourceType::Derive,
+    };
+
+    rrd_cache::update_value(
+        &name,
+        data_point.value,
+        data_point.timestamp,
+        data_source_type,
+    );
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-02-11 12:06 UTC|newest]

Thread overview: 49+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-02-11 12:05 [pdm-devel] [PATCH proxmox-datacenter-manager 00/25] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 01/25] test support: add NamedTempFile helper Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 02/25] test support: add NamedTempDir helper Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 03/25] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-02-11 14:18   ` Maximiliano Sandoval
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 04/25] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 05/25] metric collection: split top_entities split into separate module Lukas Wagner
2025-02-11 12:05 ` Lukas Wagner [this message]
2025-02-12 13:59   ` [pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric collection: save metric data to RRD in separate task Wolfgang Bumiller
2025-02-12 14:32     ` Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 07/25] metric collection: rework metric poll task Lukas Wagner
2025-02-11 12:58   ` Lukas Wagner
2025-02-12 15:57   ` Wolfgang Bumiller
2025-02-13 12:31     ` Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 08/25] metric collection: persist state after metric collection Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 09/25] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 10/25] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-02-13  8:55   ` Wolfgang Bumiller
2025-02-13 13:50     ` Lukas Wagner
2025-02-13 14:19       ` Wolfgang Bumiller
2025-02-13 15:21         ` Lukas Wagner
2025-02-13 15:34           ` Wolfgang Bumiller
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 11/25] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 12/25] metric collection: add test for fetch_overdue Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 13/25] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 14/25] metric collection: add test for rrd task Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 15/25] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 16/25] metric collection: record remote response time in metric database Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 17/25] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-02-13 11:53   ` Wolfgang Bumiller
2025-02-13 12:12     ` Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 18/25] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 19/25] api: add endpoint for updating metric collection settings Lukas Wagner
2025-02-13 12:09   ` Wolfgang Bumiller
2025-02-13 12:15     ` Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 20/25] api: add endpoint to trigger metric collection Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 21/25] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 22/25] api: add api for querying metric collection RRD data Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 23/25] api: metric-collection: add status endpoint Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 24/25] pdm-client: add metric collection API methods Lukas Wagner
2025-02-13 12:10   ` Wolfgang Bumiller
2025-02-13 13:52     ` Lukas Wagner
2025-02-11 12:05 ` [pdm-devel] [PATCH proxmox-datacenter-manager 25/25] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-02-13 12:14   ` Wolfgang Bumiller
2025-02-13 14:17     ` Lukas Wagner
2025-02-13 14:56       ` Wolfgang Bumiller
2025-02-13 14:58         ` Lukas Wagner
2025-02-13 15:11           ` Lukas Wagner
2025-02-14 13:08 ` [pdm-devel] [PATCH proxmox-datacenter-manager 00/25] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250211120541.163621-7-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal