From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pdm-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 5C5831FF164
	for <inbox@lore.proxmox.com>; Fri, 14 Feb 2025 14:07:43 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 1D30F19D2B;
	Fri, 14 Feb 2025 14:07:44 +0100 (CET)
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Date: Fri, 14 Feb 2025 14:06:38 +0100
Message-Id: <20250214130653.283012-14-l.wagner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com>
References: <20250214130653.283012-1-l.wagner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.010 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric
 collection: pass rrd cache instance as function parameter
X-BeenThere: pdm-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Datacenter Manager development discussion
 <pdm-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/>
List-Post: <mailto:pdm-devel@lists.proxmox.com>
List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, 
 <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Datacenter Manager development discussion
 <pdm-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pdm-devel-bounces@lists.proxmox.com
Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com>

This enables us to do dependency injection for tests by passing in the
rrd cache to use, instead of having to replace the global static
instance. The latter is always awkward because tests might run
multi-threaded, so the global instance could/should be a thread-local,
but this again is weird in tokio-world where we actually want task-local
variables, which in turn are weird once you actually start new tasks,
which then don't access the same task-local variables as their parent
task... - long story short, passing in the dependency as a parameter
makes things easier.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/rrd_common.rs                 |  4 +-
 server/src/metric_collection/mod.rs          | 15 +++-
 server/src/metric_collection/rrd_cache.rs    | 77 ++++++++++++--------
 server/src/metric_collection/rrd_task.rs     | 16 ++--
 server/src/metric_collection/top_entities.rs |  3 +
 5 files changed, 75 insertions(+), 40 deletions(-)

diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs
index 0d82d0c3..d9ed017a 100644
--- a/server/src/api/rrd_common.rs
+++ b/server/src/api/rrd_common.rs
@@ -23,9 +23,11 @@ pub fn create_datapoints_from_rrd<T: DataPoint>(
     let mut timemap = BTreeMap::new();
     let mut last_resolution = None;
 
+    let cache = rrd_cache::get_cache();
+
     for name in T::fields() {
         let (start, resolution, data) =
-            match rrd_cache::extract_data(basedir, name, timeframe, mode)? {
+            match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? {
                 Some(data) => data.into(),
                 None => continue,
             };
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 9cd60455..509d4f88 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -4,6 +4,8 @@ use std::sync::OnceLock;
 use anyhow::{bail, Error};
 use tokio::sync::mpsc::{self, Sender};
 
+use proxmox_sys::fs::CreateOptions;
+
 mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
@@ -16,7 +18,14 @@ static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
 
 /// Initialize the RRD cache
 pub fn init() -> Result<(), Error> {
-    rrd_cache::init()?;
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+    let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?;
+    rrd_cache::set_cache(cache)?;
 
     Ok(())
 }
@@ -42,8 +51,10 @@ pub fn start_task() -> Result<(), Error> {
         futures::future::select(metric_collection_task_future, abort_future).await;
     });
 
+    let cache = rrd_cache::get_cache();
+
     tokio::spawn(async move {
-        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
+        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(rrd_task_future, abort_future).await;
     });
diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs
index a8d72b80..70c91ca6 100644
--- a/server/src/metric_collection/rrd_cache.rs
+++ b/server/src/metric_collection/rrd_cache.rs
@@ -5,6 +5,7 @@
 //! and update RRD data inside `proxmox-datacenter-api`.
 
 use std::path::Path;
+use std::sync::Arc;
 
 use anyhow::{format_err, Error};
 use once_cell::sync::OnceCell;
@@ -16,32 +17,45 @@ use proxmox_sys::fs::CreateOptions;
 
 use pdm_buildcfg::PDM_STATE_DIR_M;
 
-const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
+pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
 
-static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
+// This is an `Arc` because this makes it easier to do dependency injection
+// in test contexts.
+//
+// For DI in testing, we want to pass in a reference to the Cache
+// as a function parameter. In a couple of these functions we
+// spawn tokio tasks which need access to the reference, hence the
+// reference needs to be 'static. In a test context, we kind of have a
+// hard time to come up with a 'static reference, so we just
+// wrap the cache in an `Arc` for now, solving the
+// lifetime problem via refcounting.
+static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new();
 
 /// Get the RRD cache instance
-fn get_cache() -> Result<&'static Cache, Error> {
+pub fn get_cache() -> Arc<Cache> {
+    RRD_CACHE.get().cloned().expect("rrd cache not initialized")
+}
+
+pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> {
     RRD_CACHE
-        .get()
-        .ok_or_else(|| format_err!("RRD cache not initialized!"))
+        .set(cache)
+        .map_err(|_| format_err!("RRD cache already initialized!"))?;
+
+    Ok(())
 }
 
 /// Initialize the RRD cache instance
 ///
 /// Note: Only a single process must do this (proxmox-datacenter-api)
-pub fn init() -> Result<&'static Cache, Error> {
-    let api_uid = pdm_config::api_user()?.uid;
-    let api_gid = pdm_config::api_group()?.gid;
-
-    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
+pub fn init<P: AsRef<Path>>(
+    base_path: P,
+    dir_options: CreateOptions,
+    file_options: CreateOptions,
+) -> Result<Arc<Cache>, Error> {
     let apply_interval = 30.0 * 60.0; // 30 minutes
 
     let cache = Cache::new(
-        RRD_CACHE_BASEDIR,
+        base_path,
         Some(file_options),
         Some(dir_options),
         apply_interval,
@@ -51,11 +65,7 @@ pub fn init() -> Result<&'static Cache, Error> {
 
     cache.apply_journal()?;
 
-    RRD_CACHE
-        .set(cache)
-        .map_err(|_| format_err!("RRD cache already initialized!"))?;
-
-    Ok(RRD_CACHE.get().unwrap())
+    Ok(Arc::new(cache))
 }
 
 fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> {
@@ -91,6 +101,7 @@ fn create_callback(dst: DataSourceType) -> Database {
 
 /// Extracts data for the specified time frame from RRD cache
 pub fn extract_data(
+    rrd_cache: &Cache,
     basedir: &str,
     name: &str,
     timeframe: RrdTimeframe,
@@ -112,26 +123,28 @@ pub fn extract_data(
         RrdMode::Average => AggregationFn::Average,
     };
 
-    let rrd_cache = get_cache()?;
-
     rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
 }
 
 /// Sync/Flush the RRD journal
 pub fn sync_journal() {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) = rrd_cache.sync_journal() {
-            log::error!("rrd_sync_journal failed - {err}");
-        }
+    let rrd_cache = get_cache();
+    if let Err(err) = rrd_cache.sync_journal() {
+        log::error!("rrd_sync_journal failed - {err}");
     }
 }
+
 /// Update RRD Gauge values
-pub fn update_value(name: &str, value: f64, timestamp: i64, datasource_type: DataSourceType) {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) =
-            rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
-        {
-            log::error!("rrd::update_value '{name}' failed - {err}");
-        }
+pub fn update_value(
+    rrd_cache: &Cache,
+    name: &str,
+    value: f64,
+    timestamp: i64,
+    datasource_type: DataSourceType,
+) {
+    if let Err(err) =
+        rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
+    {
+        log::error!("rrd::update_value '{name}' failed - {err}");
     }
 }
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index 1c618f54..3704b0e7 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -1,7 +1,9 @@
+use std::sync::Arc;
+
 use anyhow::Error;
 use tokio::sync::{mpsc::Receiver, oneshot};
 
-use proxmox_rrd::rrd::DataSourceType;
+use proxmox_rrd::{rrd::DataSourceType, Cache};
 
 use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
@@ -39,9 +41,11 @@ pub(super) struct RrdStoreResult {
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
+    cache: Arc<Cache>,
     mut receiver: Receiver<RrdStoreRequest>,
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
+        let cache_clone = Arc::clone(&cache);
         // Involves some blocking file IO
         let res = tokio::task::spawn_blocking(move || {
             let mut most_recent_timestamp = 0;
@@ -53,7 +57,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pve(&remote, &data_point);
+                        store_metric_pve(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -65,7 +69,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pbs(&remote, &data_point);
+                        store_metric_pbs(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -91,7 +95,7 @@ pub(super) async fn store_in_rrd_task(
     Ok(())
 }
 
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) {
     let name = format!(
         "pve/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -105,6 +109,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
@@ -112,7 +117,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     );
 }
 
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) {
     let name = format!(
         "pbs/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -126,6 +131,7 @@ fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs
index f8e053fb..f54ee72f 100644
--- a/server/src/metric_collection/top_entities.rs
+++ b/server/src/metric_collection/top_entities.rs
@@ -121,7 +121,10 @@ fn get_entity(
     name: String,
     metric: &str,
 ) -> Option<(f64, TopEntity)> {
+    let cache = rrd_cache::get_cache();
+
     if let Ok(Some(values)) = rrd_cache::extract_data(
+        &cache,
         &name,
         metric,
         timeframe,
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel