From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 3F0DA1FF172 for <inbox@lore.proxmox.com>; Wed, 16 Apr 2025 14:56:53 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8EED736C46; Wed, 16 Apr 2025 14:56:51 +0200 (CEST) From: Lukas Wagner <l.wagner@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Wed, 16 Apr 2025 14:56:20 +0200 Message-Id: <20250416125642.291552-5-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250416125642.291552-1-l.wagner@proxmox.com> References: <20250416125642.291552-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.986 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_MAILER 2 Automated Mailer Tag Left in Email SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v3 04/26] metric collection: save metric data to RRD in separate task X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> This is a preparation for concurrent metric collection. While the RRD cache appears to be safe to concurrent access (it uses an RwLock to protect the data), delegating all RRD writes to a separate tokio task makes it IMO easier to reason about and understand. Furthermore, it decouples the 'metric fetching' and 'metric storing' parts, making it much easier to write tests for both parts independently. For better code separation, this also splits out the new task into a new submodule. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com> --- server/src/metric_collection/mod.rs | 102 +++++++---------------- server/src/metric_collection/rrd_task.rs | 96 +++++++++++++++++++++ 2 files changed, 128 insertions(+), 70 deletions(-) create mode 100644 server/src/metric_collection/rrd_task.rs diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs index 5540f937..06ade5f0 100644 --- a/server/src/metric_collection/mod.rs +++ b/server/src/metric_collection/mod.rs @@ -2,18 +2,18 @@ use std::collections::HashMap; use std::pin::pin; use anyhow::Error; - -use pbs_api_types::{MetricDataPoint, MetricDataType}; -use proxmox_rrd::rrd::DataSourceType; +use tokio::sync::mpsc::{self, Sender}; use pdm_api_types::remotes::RemoteType; -use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType}; use crate::{connection, task_utils}; pub mod rrd_cache; +mod rrd_task; pub mod top_entities; +use rrd_task::RrdStoreRequest; + const COLLECTION_INTERVAL: u64 = 60; /// Initialize the RRD cache @@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> { /// Start the metric collection task. pub fn start_task() { + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { - let task_scheduler = pin!(metric_collection_task()); + let task_scheduler = pin!(metric_collection_task(tx)); + let abort_future = pin!(proxmox_daemon::shutdown_future()); + futures::future::select(task_scheduler, abort_future).await; + }); + + tokio::spawn(async move { + let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx)); let abort_future = pin!(proxmox_daemon::shutdown_future()); futures::future::select(task_scheduler, abort_future).await; }); } -async fn metric_collection_task() -> Result<(), Error> { +async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> { let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new(); loop { @@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> { .cluster_metrics_export(Some(true), Some(false), Some(start_time)) .await?; - //// Involves some blocking file IO - tokio::task::spawn_blocking(move || { - let mut most_recent_timestamp = 0; + let most_recent = + metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); - for data_point in metrics.data { - most_recent_timestamp = - most_recent_timestamp.max(data_point.timestamp); - store_metric_pve(&remote_name_clone, &data_point); - } + sender + .send(RrdStoreRequest::Pve { + remote: remote_name_clone, + metrics, + }) + .await?; - most_recent_timestamp - }) - .await + Ok::<i64, Error>(most_recent) } RemoteType::Pbs => { let client = connection::make_pbs_client(remote)?; let metrics = client.metrics(Some(true), Some(start_time)).await?; - // Involves some blocking file IO - tokio::task::spawn_blocking(move || { - let mut most_recent_timestamp = 0; + let most_recent = + metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); - for data_point in metrics.data { - most_recent_timestamp = - most_recent_timestamp.max(data_point.timestamp); - store_metric_pbs(&remote_name_clone, &data_point); - } + sender + .send(RrdStoreRequest::Pbs { + remote: remote_name_clone, + metrics, + }) + .await?; - most_recent_timestamp - }) - .await + Ok::<i64, Error>(most_recent) } }?; @@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> { } } } - -fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { - let name = format!( - "pve/{remote_name}/{id}/{metric}", - id = data_point.id, - metric = data_point.metric, - ); - - let data_source_type = match data_point.ty { - ClusterMetricsDataType::Gauge => DataSourceType::Gauge, - ClusterMetricsDataType::Counter => DataSourceType::Counter, - ClusterMetricsDataType::Derive => DataSourceType::Derive, - }; - - rrd_cache::update_value( - &name, - data_point.value, - data_point.timestamp, - data_source_type, - ); -} - -fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { - let name = format!( - "pbs/{remote_name}/{id}/{metric}", - id = data_point.id, - metric = data_point.metric, - ); - - let data_source_type = match data_point.ty { - MetricDataType::Gauge => DataSourceType::Gauge, - MetricDataType::Counter => DataSourceType::Counter, - MetricDataType::Derive => DataSourceType::Derive, - }; - - rrd_cache::update_value( - &name, - data_point.value, - data_point.timestamp, - data_source_type, - ); -} diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs new file mode 100644 index 00000000..a72945df --- /dev/null +++ b/server/src/metric_collection/rrd_task.rs @@ -0,0 +1,96 @@ +use anyhow::Error; +use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; +use proxmox_rrd::rrd::DataSourceType; +use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; +use tokio::sync::mpsc::Receiver; + +use super::rrd_cache; + +/// Store request for the RRD task. +pub(super) enum RrdStoreRequest { + /// Store PVE metrics. + Pve { + /// Remote name. + remote: String, + /// Metric data. + metrics: ClusterMetrics, + }, + /// Store PBS metrics. + Pbs { + /// Remote name. + remote: String, + /// Metric data. + metrics: Metrics, + }, +} + +/// Task which stores received metrics in the RRD. Metric data is fed into +/// this task via a MPSC channel. +pub(super) async fn store_in_rrd_task( + mut receiver: Receiver<RrdStoreRequest>, +) -> Result<(), Error> { + while let Some(msg) = receiver.recv().await { + // Involves some blocking file IO + let res = tokio::task::spawn_blocking(move || match msg { + RrdStoreRequest::Pve { remote, metrics } => { + for data_point in metrics.data { + store_metric_pve(&remote, &data_point); + } + } + RrdStoreRequest::Pbs { remote, metrics } => { + for data_point in metrics.data { + store_metric_pbs(&remote, &data_point); + } + } + }) + .await; + + if let Err(err) = res { + log::error!("error in rrd task when attempting to save metrics: {err}"); + } + } + + Ok(()) +} + +fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { + let name = format!( + "pve/{remote_name}/{id}/{metric}", + id = data_point.id, + metric = data_point.metric, + ); + + let data_source_type = match data_point.ty { + ClusterMetricsDataType::Gauge => DataSourceType::Gauge, + ClusterMetricsDataType::Counter => DataSourceType::Counter, + ClusterMetricsDataType::Derive => DataSourceType::Derive, + }; + + rrd_cache::update_value( + &name, + data_point.value, + data_point.timestamp, + data_source_type, + ); +} + +fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { + let name = format!( + "pbs/{remote_name}/{id}/{metric}", + id = data_point.id, + metric = data_point.metric, + ); + + let data_source_type = match data_point.ty { + MetricDataType::Gauge => DataSourceType::Gauge, + MetricDataType::Counter => DataSourceType::Counter, + MetricDataType::Derive => DataSourceType::Derive, + }; + + rrd_cache::update_value( + &name, + data_point.value, + data_point.timestamp, + data_source_type, + ); +} -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel