From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 5C5831FF164 for <inbox@lore.proxmox.com>; Fri, 14 Feb 2025 14:07:43 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1D30F19D2B; Fri, 14 Feb 2025 14:07:44 +0100 (CET) From: Lukas Wagner <l.wagner@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Fri, 14 Feb 2025 14:06:38 +0100 Message-Id: <20250214130653.283012-14-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com> References: <20250214130653.283012-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.010 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric collection: pass rrd cache instance as function parameter X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> This enables us to do dependency injection for tests by passing in the rrd cache to use, instead of having to replace the global static instance. The latter is always awkward because tests might run multi-threaded, so the global instance could/should be a thread-local, but this again is weird in tokio-world where we actually want task-local variables, which in turn are weird once you actually start new tasks, which then don't access the same task-local variables as their parent task... - long story short, passing in the dependency as a parameter makes things easier. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- server/src/api/rrd_common.rs | 4 +- server/src/metric_collection/mod.rs | 15 +++- server/src/metric_collection/rrd_cache.rs | 77 ++++++++++++-------- server/src/metric_collection/rrd_task.rs | 16 ++-- server/src/metric_collection/top_entities.rs | 3 + 5 files changed, 75 insertions(+), 40 deletions(-) diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs index 0d82d0c3..d9ed017a 100644 --- a/server/src/api/rrd_common.rs +++ b/server/src/api/rrd_common.rs @@ -23,9 +23,11 @@ pub fn create_datapoints_from_rrd<T: DataPoint>( let mut timemap = BTreeMap::new(); let mut last_resolution = None; + let cache = rrd_cache::get_cache(); + for name in T::fields() { let (start, resolution, data) = - match rrd_cache::extract_data(basedir, name, timeframe, mode)? { + match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? { Some(data) => data.into(), None => continue, }; diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs index 9cd60455..509d4f88 100644 --- a/server/src/metric_collection/mod.rs +++ b/server/src/metric_collection/mod.rs @@ -4,6 +4,8 @@ use std::sync::OnceLock; use anyhow::{bail, Error}; use tokio::sync::mpsc::{self, Sender}; +use proxmox_sys::fs::CreateOptions; + mod collection_task; pub mod rrd_cache; mod rrd_task; @@ -16,7 +18,14 @@ static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new(); /// Initialize the RRD cache pub fn init() -> Result<(), Error> { - rrd_cache::init()?; + let api_uid = pdm_config::api_user()?.uid; + let api_gid = pdm_config::api_group()?.gid; + + let file_options = CreateOptions::new().owner(api_uid).group(api_gid); + let dir_options = CreateOptions::new().owner(api_uid).group(api_gid); + + let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?; + rrd_cache::set_cache(cache)?; Ok(()) } @@ -42,8 +51,10 @@ pub fn start_task() -> Result<(), Error> { futures::future::select(metric_collection_task_future, abort_future).await; }); + let cache = rrd_cache::get_cache(); + tokio::spawn(async move { - let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx)); + let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx)); let abort_future = pin!(proxmox_daemon::shutdown_future()); futures::future::select(rrd_task_future, abort_future).await; }); diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs index a8d72b80..70c91ca6 100644 --- a/server/src/metric_collection/rrd_cache.rs +++ b/server/src/metric_collection/rrd_cache.rs @@ -5,6 +5,7 @@ //! and update RRD data inside `proxmox-datacenter-api`. use std::path::Path; +use std::sync::Arc; use anyhow::{format_err, Error}; use once_cell::sync::OnceCell; @@ -16,32 +17,45 @@ use proxmox_sys::fs::CreateOptions; use pdm_buildcfg::PDM_STATE_DIR_M; -const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb"); +pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb"); -static RRD_CACHE: OnceCell<Cache> = OnceCell::new(); +// This is an `Arc` because this makes it easier to do dependency injection +// in test contexts. +// +// For DI in testing, we want to pass in a reference to the Cache +// as a function parameter. In a couple of these functions we +// spawn tokio tasks which need access to the reference, hence the +// reference needs to be 'static. In a test context, we kind of have a +// hard time to come up with a 'static reference, so we just +// wrap the cache in an `Arc` for now, solving the +// lifetime problem via refcounting. +static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new(); /// Get the RRD cache instance -fn get_cache() -> Result<&'static Cache, Error> { +pub fn get_cache() -> Arc<Cache> { + RRD_CACHE.get().cloned().expect("rrd cache not initialized") +} + +pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> { RRD_CACHE - .get() - .ok_or_else(|| format_err!("RRD cache not initialized!")) + .set(cache) + .map_err(|_| format_err!("RRD cache already initialized!"))?; + + Ok(()) } /// Initialize the RRD cache instance /// /// Note: Only a single process must do this (proxmox-datacenter-api) -pub fn init() -> Result<&'static Cache, Error> { - let api_uid = pdm_config::api_user()?.uid; - let api_gid = pdm_config::api_group()?.gid; - - let file_options = CreateOptions::new().owner(api_uid).group(api_gid); - - let dir_options = CreateOptions::new().owner(api_uid).group(api_gid); - +pub fn init<P: AsRef<Path>>( + base_path: P, + dir_options: CreateOptions, + file_options: CreateOptions, +) -> Result<Arc<Cache>, Error> { let apply_interval = 30.0 * 60.0; // 30 minutes let cache = Cache::new( - RRD_CACHE_BASEDIR, + base_path, Some(file_options), Some(dir_options), apply_interval, @@ -51,11 +65,7 @@ pub fn init() -> Result<&'static Cache, Error> { cache.apply_journal()?; - RRD_CACHE - .set(cache) - .map_err(|_| format_err!("RRD cache already initialized!"))?; - - Ok(RRD_CACHE.get().unwrap()) + Ok(Arc::new(cache)) } fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> { @@ -91,6 +101,7 @@ fn create_callback(dst: DataSourceType) -> Database { /// Extracts data for the specified time frame from RRD cache pub fn extract_data( + rrd_cache: &Cache, basedir: &str, name: &str, timeframe: RrdTimeframe, @@ -112,26 +123,28 @@ pub fn extract_data( RrdMode::Average => AggregationFn::Average, }; - let rrd_cache = get_cache()?; - rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end)) } /// Sync/Flush the RRD journal pub fn sync_journal() { - if let Ok(rrd_cache) = get_cache() { - if let Err(err) = rrd_cache.sync_journal() { - log::error!("rrd_sync_journal failed - {err}"); - } + let rrd_cache = get_cache(); + if let Err(err) = rrd_cache.sync_journal() { + log::error!("rrd_sync_journal failed - {err}"); } } + /// Update RRD Gauge values -pub fn update_value(name: &str, value: f64, timestamp: i64, datasource_type: DataSourceType) { - if let Ok(rrd_cache) = get_cache() { - if let Err(err) = - rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type) - { - log::error!("rrd::update_value '{name}' failed - {err}"); - } +pub fn update_value( + rrd_cache: &Cache, + name: &str, + value: f64, + timestamp: i64, + datasource_type: DataSourceType, +) { + if let Err(err) = + rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type) + { + log::error!("rrd::update_value '{name}' failed - {err}"); } } diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs index 1c618f54..3704b0e7 100644 --- a/server/src/metric_collection/rrd_task.rs +++ b/server/src/metric_collection/rrd_task.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; + use anyhow::Error; use tokio::sync::{mpsc::Receiver, oneshot}; -use proxmox_rrd::rrd::DataSourceType; +use proxmox_rrd::{rrd::DataSourceType, Cache}; use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; @@ -39,9 +41,11 @@ pub(super) struct RrdStoreResult { /// Task which stores received metrics in the RRD. Metric data is fed into /// this task via a MPSC channel. pub(super) async fn store_in_rrd_task( + cache: Arc<Cache>, mut receiver: Receiver<RrdStoreRequest>, ) -> Result<(), Error> { while let Some(msg) = receiver.recv().await { + let cache_clone = Arc::clone(&cache); // Involves some blocking file IO let res = tokio::task::spawn_blocking(move || { let mut most_recent_timestamp = 0; @@ -53,7 +57,7 @@ pub(super) async fn store_in_rrd_task( } => { for data_point in metrics.data { most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp); - store_metric_pve(&remote, &data_point); + store_metric_pve(&cache_clone, &remote, &data_point); } channel @@ -65,7 +69,7 @@ pub(super) async fn store_in_rrd_task( } => { for data_point in metrics.data { most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp); - store_metric_pbs(&remote, &data_point); + store_metric_pbs(&cache_clone, &remote, &data_point); } channel @@ -91,7 +95,7 @@ pub(super) async fn store_in_rrd_task( Ok(()) } -fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { +fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) { let name = format!( "pve/{remote_name}/{id}/{metric}", id = data_point.id, @@ -105,6 +109,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { }; rrd_cache::update_value( + cache, &name, data_point.value, data_point.timestamp, @@ -112,7 +117,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) { ); } -fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { +fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) { let name = format!( "pbs/{remote_name}/{id}/{metric}", id = data_point.id, @@ -126,6 +131,7 @@ fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) { }; rrd_cache::update_value( + cache, &name, data_point.value, data_point.timestamp, diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs index f8e053fb..f54ee72f 100644 --- a/server/src/metric_collection/top_entities.rs +++ b/server/src/metric_collection/top_entities.rs @@ -121,7 +121,10 @@ fn get_entity( name: String, metric: &str, ) -> Option<(f64, TopEntity)> { + let cache = rrd_cache::get_cache(); + if let Ok(Some(values)) = rrd_cache::extract_data( + &cache, &name, metric, timeframe, -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel