From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id EF9C41FF165 for <inbox@lore.proxmox.com>; Wed, 12 Feb 2025 14:59:33 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 15C9E16DCE; Wed, 12 Feb 2025 14:59:31 +0100 (CET) Date: Wed, 12 Feb 2025 14:59:26 +0100 From: Wolfgang Bumiller <w.bumiller@proxmox.com> To: Lukas Wagner <l.wagner@proxmox.com> Message-ID: <wbsa6av3miycxsalv2eopo5u7l53kfrqz6ug5ceoy3ikjr35op@krhgfeqoccem> References: <20250211120541.163621-1-l.wagner@proxmox.com> <20250211120541.163621-7-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Disposition: inline In-Reply-To: <20250211120541.163621-7-l.wagner@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.083 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pdm-devel] [PATCH proxmox-datacenter-manager 06/25] metric collection: save metric data to RRD in separate task X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Cc: pdm-devel@lists.proxmox.com Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> On Tue, Feb 11, 2025 at 01:05:22PM +0100, Lukas Wagner wrote: > This is a preparation for concurrent metric collection. While the RRD > cache appears to be safe to concurrent access (it uses an RwLock to > protect the data), delegating all RRD writes to a separate tokio task > makes it IMO easier to reason about and understand. Furthermore, it > decouples the 'metric fetching' and 'metric > storing' parts, making it much easier to write tests for both parts > independently. > > For better code separation, this also splits out the new task into > a new submodule. > > Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> > --- > server/src/metric_collection/mod.rs | 102 +++++++---------------- > server/src/metric_collection/rrd_task.rs | 92 ++++++++++++++++++++ > 2 files changed, 124 insertions(+), 70 deletions(-) > create mode 100644 server/src/metric_collection/rrd_task.rs > > diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs > index 5540f93..06ade5f 100644 > --- a/server/src/metric_collection/mod.rs > +++ b/server/src/metric_collection/mod.rs > @@ -2,18 +2,18 @@ use std::collections::HashMap; > use std::pin::pin; > > use anyhow::Error; > - > -use pbs_api_types::{MetricDataPoint, MetricDataType}; > -use proxmox_rrd::rrd::DataSourceType; > +use tokio::sync::mpsc::{self, Sender}; > > use pdm_api_types::remotes::RemoteType; > -use pve_api_types::{ClusterMetricsData, ClusterMetricsDataType}; > > use crate::{connection, task_utils}; > > pub mod rrd_cache; > +mod rrd_task; > pub mod top_entities; > > +use rrd_task::RrdStoreRequest; > + > const COLLECTION_INTERVAL: u64 = 60; > > /// Initialize the RRD cache > @@ -25,14 +25,22 @@ pub fn init() -> Result<(), Error> { > > /// Start the metric collection task. > pub fn start_task() { > + let (tx, rx) = mpsc::channel(128); > + > + tokio::spawn(async move { > + let task_scheduler = pin!(metric_collection_task(tx)); > + let abort_future = pin!(proxmox_daemon::shutdown_future()); > + futures::future::select(task_scheduler, abort_future).await; > + }); > + > tokio::spawn(async move { > - let task_scheduler = pin!(metric_collection_task()); > + let task_scheduler = pin!(rrd_task::store_in_rrd_task(rx)); > let abort_future = pin!(proxmox_daemon::shutdown_future()); > futures::future::select(task_scheduler, abort_future).await; > }); > } > > -async fn metric_collection_task() -> Result<(), Error> { > +async fn metric_collection_task(sender: Sender<RrdStoreRequest>) -> Result<(), Error> { > let mut most_recent_timestamps: HashMap<String, i64> = HashMap::new(); > > loop { > @@ -59,37 +67,33 @@ async fn metric_collection_task() -> Result<(), Error> { > .cluster_metrics_export(Some(true), Some(false), Some(start_time)) > .await?; > > - //// Involves some blocking file IO > - tokio::task::spawn_blocking(move || { > - let mut most_recent_timestamp = 0; > + let most_recent = > + metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); Might be more readable using `Iterator::max()`: metrics.data.iter().map(|e| e.timestamp).max().unwrap_or(0); (while this changes throughout the series, there's at least 1 instance of this pattern remaining at the end AFAICT) > > - for data_point in metrics.data { > - most_recent_timestamp = > - most_recent_timestamp.max(data_point.timestamp); > - store_metric_pve(&remote_name_clone, &data_point); > - } > + sender > + .send(RrdStoreRequest::Pve { > + remote: remote_name_clone, > + metrics, > + }) > + .await?; > > - most_recent_timestamp > - }) > - .await > + Ok::<i64, Error>(most_recent) ^ This... > } > RemoteType::Pbs => { > let client = connection::make_pbs_client(remote)?; > let metrics = client.metrics(Some(true), Some(start_time)).await?; > > - // Involves some blocking file IO > - tokio::task::spawn_blocking(move || { > - let mut most_recent_timestamp = 0; > + let most_recent = > + metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); > > - for data_point in metrics.data { > - most_recent_timestamp = > - most_recent_timestamp.max(data_point.timestamp); > - store_metric_pbs(&remote_name_clone, &data_point); > - } > + sender > + .send(RrdStoreRequest::Pbs { > + remote: remote_name_clone, > + metrics, > + }) > + .await?; > > - most_recent_timestamp > - }) > - .await > + Ok::<i64, Error>(most_recent) ^ ... and this are now the only values for a single `let` binding which now looks like: let x = big_always_Ok_code?; // `?` op on always-Ok() Ok(x) // and re-OK-wrap Could just drop the `Ok` and `?`. > } > }?; > > @@ -106,45 +110,3 @@ async fn metric_collection_task() -> Result<(), Error> { > } > } > } > - > -fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { > - let name = format!( > - "pve/{remote_name}/{id}/{metric}", > - id = data_point.id, > - metric = data_point.metric, > - ); > - > - let data_source_type = match data_point.ty { > - ClusterMetricsDataType::Gauge => DataSourceType::Gauge, > - ClusterMetricsDataType::Counter => DataSourceType::Counter, > - ClusterMetricsDataType::Derive => DataSourceType::Derive, > - }; > - > - rrd_cache::update_value( > - &name, > - data_point.value, > - data_point.timestamp, > - data_source_type, > - ); > -} > - > -fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { > - let name = format!( > - "pbs/{remote_name}/{id}/{metric}", > - id = data_point.id, > - metric = data_point.metric, > - ); > - > - let data_source_type = match data_point.ty { > - MetricDataType::Gauge => DataSourceType::Gauge, > - MetricDataType::Counter => DataSourceType::Counter, > - MetricDataType::Derive => DataSourceType::Derive, > - }; > - > - rrd_cache::update_value( > - &name, > - data_point.value, > - data_point.timestamp, > - data_source_type, > - ); > -} > diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs > new file mode 100644 > index 0000000..fe19580 > --- /dev/null > +++ b/server/src/metric_collection/rrd_task.rs > @@ -0,0 +1,92 @@ > +use anyhow::Error; > +use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; > +use proxmox_rrd::rrd::DataSourceType; > +use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; > +use tokio::sync::mpsc::Receiver; > + > +use super::rrd_cache; > + > +/// Store requrest for the RRD task. > +pub(super) enum RrdStoreRequest { > + /// Store PVE metrics. > + Pve { > + /// Remote name > + remote: String, > + /// Metric data > + metrics: ClusterMetrics, > + }, > + /// Store PBS metrics. > + Pbs { > + /// Remote name > + remote: String, > + /// Metric data > + metrics: Metrics, > + }, > +} > + > +/// Task which stores received metrics in the RRD. Metric data is fed into > +/// this task via a MPSC channel. > +pub(super) async fn store_in_rrd_task( > + mut receiver: Receiver<RrdStoreRequest>, > +) -> Result<(), Error> { > + while let Some(msg) = receiver.recv().await { > + //// Involves some blocking file IO > + tokio::task::spawn_blocking(move || match msg { > + RrdStoreRequest::Pve { remote, metrics } => { > + for data_point in metrics.data { > + store_metric_pve(&remote, &data_point); > + } > + } > + RrdStoreRequest::Pbs { remote, metrics } => { > + for data_point in metrics.data { > + store_metric_pbs(&remote, &data_point); > + } > + } > + }) > + .await?; ^ This error won't be shown anywhere as this is the `spawn()`ed task's entry point, and the error will stop the task for the rest of the daemon's lifetime. In general I prefer "detached" tasks to always return `()`. Perhaps we should have a helper for the `spawn_task_aborted_on_shutdown()` which enforces the `()` return type in its generic bounds as well as a version which simply logs errors if they are returned where we also warn in the docs that the task will then be over for the rest of the daemon's lifetime. > + } > + > + Ok(()) > +} > + > +fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { > + let name = format!( > + "pve/{remote_name}/{id}/{metric}", > + id = data_point.id, > + metric = data_point.metric, > + ); > + > + let data_source_type = match data_point.ty { > + ClusterMetricsDataType::Gauge => DataSourceType::Gauge, > + ClusterMetricsDataType::Counter => DataSourceType::Counter, > + ClusterMetricsDataType::Derive => DataSourceType::Derive, > + }; > + > + rrd_cache::update_value( > + &name, > + data_point.value, > + data_point.timestamp, > + data_source_type, > + ); > +} > + > +fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { > + let name = format!( > + "pbs/{remote_name}/{id}/{metric}", > + id = data_point.id, > + metric = data_point.metric, > + ); > + > + let data_source_type = match data_point.ty { > + MetricDataType::Gauge => DataSourceType::Gauge, > + MetricDataType::Counter => DataSourceType::Counter, > + MetricDataType::Derive => DataSourceType::Derive, > + }; > + > + rrd_cache::update_value( > + &name, > + data_point.value, > + data_point.timestamp, > + data_source_type, > + ); > +} > -- > 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel