public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric collection: pass rrd cache instance as function parameter
Date: Fri, 14 Feb 2025 14:06:38 +0100	[thread overview]
Message-ID: <20250214130653.283012-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com>

This enables us to do dependency injection for tests by passing in the
rrd cache to use, instead of having to replace the global static
instance. The latter is always awkward because tests might run
multi-threaded, so the global instance could/should be a thread-local,
but this again is weird in tokio-world where we actually want task-local
variables, which in turn are weird once you actually start new tasks,
which then don't access the same task-local variables as their parent
task... - long story short, passing in the dependency as a parameter
makes things easier.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 server/src/api/rrd_common.rs                 |  4 +-
 server/src/metric_collection/mod.rs          | 15 +++-
 server/src/metric_collection/rrd_cache.rs    | 77 ++++++++++++--------
 server/src/metric_collection/rrd_task.rs     | 16 ++--
 server/src/metric_collection/top_entities.rs |  3 +
 5 files changed, 75 insertions(+), 40 deletions(-)

diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs
index 0d82d0c3..d9ed017a 100644
--- a/server/src/api/rrd_common.rs
+++ b/server/src/api/rrd_common.rs
@@ -23,9 +23,11 @@ pub fn create_datapoints_from_rrd<T: DataPoint>(
     let mut timemap = BTreeMap::new();
     let mut last_resolution = None;
 
+    let cache = rrd_cache::get_cache();
+
     for name in T::fields() {
         let (start, resolution, data) =
-            match rrd_cache::extract_data(basedir, name, timeframe, mode)? {
+            match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? {
                 Some(data) => data.into(),
                 None => continue,
             };
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 9cd60455..509d4f88 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -4,6 +4,8 @@ use std::sync::OnceLock;
 use anyhow::{bail, Error};
 use tokio::sync::mpsc::{self, Sender};
 
+use proxmox_sys::fs::CreateOptions;
+
 mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
@@ -16,7 +18,14 @@ static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new();
 
 /// Initialize the RRD cache
 pub fn init() -> Result<(), Error> {
-    rrd_cache::init()?;
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
+
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
+    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
+
+    let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?;
+    rrd_cache::set_cache(cache)?;
 
     Ok(())
 }
@@ -42,8 +51,10 @@ pub fn start_task() -> Result<(), Error> {
         futures::future::select(metric_collection_task_future, abort_future).await;
     });
 
+    let cache = rrd_cache::get_cache();
+
     tokio::spawn(async move {
-        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(metric_data_rx));
+        let rrd_task_future = pin!(rrd_task::store_in_rrd_task(cache, metric_data_rx));
         let abort_future = pin!(proxmox_daemon::shutdown_future());
         futures::future::select(rrd_task_future, abort_future).await;
     });
diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs
index a8d72b80..70c91ca6 100644
--- a/server/src/metric_collection/rrd_cache.rs
+++ b/server/src/metric_collection/rrd_cache.rs
@@ -5,6 +5,7 @@
 //! and update RRD data inside `proxmox-datacenter-api`.
 
 use std::path::Path;
+use std::sync::Arc;
 
 use anyhow::{format_err, Error};
 use once_cell::sync::OnceCell;
@@ -16,32 +17,45 @@ use proxmox_sys::fs::CreateOptions;
 
 use pdm_buildcfg::PDM_STATE_DIR_M;
 
-const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
+pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb");
 
-static RRD_CACHE: OnceCell<Cache> = OnceCell::new();
+// This is an `Arc` because this makes it easier to do dependency injection
+// in test contexts.
+//
+// For DI in testing, we want to pass in a reference to the Cache
+// as a function parameter. In a couple of these functions we
+// spawn tokio tasks which need access to the reference, hence the
+// reference needs to be 'static. In a test context, we kind of have a
+// hard time to come up with a 'static reference, so we just
+// wrap the cache in an `Arc` for now, solving the
+// lifetime problem via refcounting.
+static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new();
 
 /// Get the RRD cache instance
-fn get_cache() -> Result<&'static Cache, Error> {
+pub fn get_cache() -> Arc<Cache> {
+    RRD_CACHE.get().cloned().expect("rrd cache not initialized")
+}
+
+pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> {
     RRD_CACHE
-        .get()
-        .ok_or_else(|| format_err!("RRD cache not initialized!"))
+        .set(cache)
+        .map_err(|_| format_err!("RRD cache already initialized!"))?;
+
+    Ok(())
 }
 
 /// Initialize the RRD cache instance
 ///
 /// Note: Only a single process must do this (proxmox-datacenter-api)
-pub fn init() -> Result<&'static Cache, Error> {
-    let api_uid = pdm_config::api_user()?.uid;
-    let api_gid = pdm_config::api_group()?.gid;
-
-    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
-    let dir_options = CreateOptions::new().owner(api_uid).group(api_gid);
-
+pub fn init<P: AsRef<Path>>(
+    base_path: P,
+    dir_options: CreateOptions,
+    file_options: CreateOptions,
+) -> Result<Arc<Cache>, Error> {
     let apply_interval = 30.0 * 60.0; // 30 minutes
 
     let cache = Cache::new(
-        RRD_CACHE_BASEDIR,
+        base_path,
         Some(file_options),
         Some(dir_options),
         apply_interval,
@@ -51,11 +65,7 @@ pub fn init() -> Result<&'static Cache, Error> {
 
     cache.apply_journal()?;
 
-    RRD_CACHE
-        .set(cache)
-        .map_err(|_| format_err!("RRD cache already initialized!"))?;
-
-    Ok(RRD_CACHE.get().unwrap())
+    Ok(Arc::new(cache))
 }
 
 fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> {
@@ -91,6 +101,7 @@ fn create_callback(dst: DataSourceType) -> Database {
 
 /// Extracts data for the specified time frame from RRD cache
 pub fn extract_data(
+    rrd_cache: &Cache,
     basedir: &str,
     name: &str,
     timeframe: RrdTimeframe,
@@ -112,26 +123,28 @@ pub fn extract_data(
         RrdMode::Average => AggregationFn::Average,
     };
 
-    let rrd_cache = get_cache()?;
-
     rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end))
 }
 
 /// Sync/Flush the RRD journal
 pub fn sync_journal() {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) = rrd_cache.sync_journal() {
-            log::error!("rrd_sync_journal failed - {err}");
-        }
+    let rrd_cache = get_cache();
+    if let Err(err) = rrd_cache.sync_journal() {
+        log::error!("rrd_sync_journal failed - {err}");
     }
 }
+
 /// Update RRD Gauge values
-pub fn update_value(name: &str, value: f64, timestamp: i64, datasource_type: DataSourceType) {
-    if let Ok(rrd_cache) = get_cache() {
-        if let Err(err) =
-            rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
-        {
-            log::error!("rrd::update_value '{name}' failed - {err}");
-        }
+pub fn update_value(
+    rrd_cache: &Cache,
+    name: &str,
+    value: f64,
+    timestamp: i64,
+    datasource_type: DataSourceType,
+) {
+    if let Err(err) =
+        rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type)
+    {
+        log::error!("rrd::update_value '{name}' failed - {err}");
     }
 }
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index 1c618f54..3704b0e7 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -1,7 +1,9 @@
+use std::sync::Arc;
+
 use anyhow::Error;
 use tokio::sync::{mpsc::Receiver, oneshot};
 
-use proxmox_rrd::rrd::DataSourceType;
+use proxmox_rrd::{rrd::DataSourceType, Cache};
 
 use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
@@ -39,9 +41,11 @@ pub(super) struct RrdStoreResult {
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
+    cache: Arc<Cache>,
     mut receiver: Receiver<RrdStoreRequest>,
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
+        let cache_clone = Arc::clone(&cache);
         // Involves some blocking file IO
         let res = tokio::task::spawn_blocking(move || {
             let mut most_recent_timestamp = 0;
@@ -53,7 +57,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pve(&remote, &data_point);
+                        store_metric_pve(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -65,7 +69,7 @@ pub(super) async fn store_in_rrd_task(
                 } => {
                     for data_point in metrics.data {
                         most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
-                        store_metric_pbs(&remote, &data_point);
+                        store_metric_pbs(&cache_clone, &remote, &data_point);
                     }
 
                     channel
@@ -91,7 +95,7 @@ pub(super) async fn store_in_rrd_task(
     Ok(())
 }
 
-fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
+fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) {
     let name = format!(
         "pve/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -105,6 +109,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
@@ -112,7 +117,7 @@ fn store_metric_pve(remote_name: &str, data_point: &ClusterMetricsData) {
     );
 }
 
-fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
+fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) {
     let name = format!(
         "pbs/{remote_name}/{id}/{metric}",
         id = data_point.id,
@@ -126,6 +131,7 @@ fn store_metric_pbs(remote_name: &str, data_point: &MetricDataPoint) {
     };
 
     rrd_cache::update_value(
+        cache,
         &name,
         data_point.value,
         data_point.timestamp,
diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs
index f8e053fb..f54ee72f 100644
--- a/server/src/metric_collection/top_entities.rs
+++ b/server/src/metric_collection/top_entities.rs
@@ -121,7 +121,10 @@ fn get_entity(
     name: String,
     metric: &str,
 ) -> Option<(f64, TopEntity)> {
+    let cache = rrd_cache::get_cache();
+
     if let Ok(Some(values)) = rrd_cache::extract_data(
+        &cache,
         &name,
         metric,
         timeframe,
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-02-14 13:07 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-02-14 13:06 [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 01/28] test support: add NamedTempFile helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 02/28] test support: add NamedTempDir helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 03/28] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-02-18 15:26   ` Wolfgang Bumiller
2025-02-18 15:31     ` Stefan Hanreich
2025-02-21  8:27     ` Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 04/28] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 05/28] metric collection: split top_entities split into separate module Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 06/28] metric collection: save metric data to RRD in separate task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 07/28] metric collection: rework metric poll task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 08/28] metric collection: persist state after metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 09/28] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 10/28] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 11/28] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 12/28] metric collection: add test for fetch_overdue Lukas Wagner
2025-02-14 13:06 ` Lukas Wagner [this message]
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 14/28] metric collection: add test for rrd task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 16/28] metric collection: record remote response time in metric database Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 17/28] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 18/28] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 19/28] api: add endpoint for updating metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 20/28] api: add endpoint to trigger metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 21/28] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 22/28] api: add api for querying metric collection RRD data Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 23/28] api: metric-collection: add status endpoint Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 24/28] pdm-client: add metric collection API methods Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 25/28] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 26/28] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 27/28] metric collection: skip missed timer ticks Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-02-21 13:19 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Maximiliano Sandoval
2025-03-14 14:10 ` Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250214130653.283012-14-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal