From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 813A81FF164 for <inbox@lore.proxmox.com>; Fri, 14 Feb 2025 14:17:17 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 350501A402; Fri, 14 Feb 2025 14:17:18 +0100 (CET) From: Lukas Wagner <l.wagner@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Fri, 14 Feb 2025 14:06:40 +0100 Message-Id: <20250214130653.283012-16-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com> References: <20250214130653.283012-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.009 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mod.rs] Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> All helper functions storing/retrieving helper functions take the cache instance as a first parameter. This smells like it should be a method on a struct. This commit wraps the foreign `proxmox_rrd::Cache` type in a new type, transforms the `init` function into a `new` method and all other helpers into further methods. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- server/src/api/rrd_common.rs | 9 +- server/src/metric_collection/mod.rs | 6 +- server/src/metric_collection/rrd_cache.rs | 191 +++++++++---------- server/src/metric_collection/rrd_task.rs | 21 +- server/src/metric_collection/top_entities.rs | 3 +- 5 files changed, 113 insertions(+), 117 deletions(-) diff --git a/server/src/api/rrd_common.rs b/server/src/api/rrd_common.rs index d9ed017a..28868bc1 100644 --- a/server/src/api/rrd_common.rs +++ b/server/src/api/rrd_common.rs @@ -26,11 +26,10 @@ pub fn create_datapoints_from_rrd<T: DataPoint>( let cache = rrd_cache::get_cache(); for name in T::fields() { - let (start, resolution, data) = - match rrd_cache::extract_data(&cache, basedir, name, timeframe, mode)? { - Some(data) => data.into(), - None => continue, - }; + let (start, resolution, data) = match cache.extract_data(basedir, name, timeframe, mode)? { + Some(data) => data.into(), + None => continue, + }; if let Some(expected_resolution) = last_resolution { if resolution != expected_resolution { diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs index 509d4f88..5b6c14d2 100644 --- a/server/src/metric_collection/mod.rs +++ b/server/src/metric_collection/mod.rs @@ -1,4 +1,5 @@ use std::pin::pin; +use std::sync::Arc; use std::sync::OnceLock; use anyhow::{bail, Error}; @@ -13,6 +14,7 @@ mod state; pub mod top_entities; use collection_task::{ControlMsg, MetricCollectionTask}; +use rrd_cache::RrdCache; static CONTROL_MESSAGE_TX: OnceLock<Sender<ControlMsg>> = OnceLock::new(); @@ -24,8 +26,8 @@ pub fn init() -> Result<(), Error> { let file_options = CreateOptions::new().owner(api_uid).group(api_gid); let dir_options = CreateOptions::new().owner(api_uid).group(api_gid); - let cache = rrd_cache::init(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?; - rrd_cache::set_cache(cache)?; + let cache = RrdCache::new(rrd_cache::RRD_CACHE_BASEDIR, dir_options, file_options)?; + rrd_cache::set_cache(Arc::new(cache))?; Ok(()) } diff --git a/server/src/metric_collection/rrd_cache.rs b/server/src/metric_collection/rrd_cache.rs index 70c91ca6..e8994143 100644 --- a/server/src/metric_collection/rrd_cache.rs +++ b/server/src/metric_collection/rrd_cache.rs @@ -29,14 +29,14 @@ pub(super) const RRD_CACHE_BASEDIR: &str = concat!(PDM_STATE_DIR_M!(), "/rrdb"); // hard time to come up with a 'static reference, so we just // wrap the cache in an `Arc` for now, solving the // lifetime problem via refcounting. -static RRD_CACHE: OnceCell<Arc<Cache>> = OnceCell::new(); +static RRD_CACHE: OnceCell<Arc<RrdCache>> = OnceCell::new(); /// Get the RRD cache instance -pub fn get_cache() -> Arc<Cache> { +pub fn get_cache() -> Arc<RrdCache> { RRD_CACHE.get().cloned().expect("rrd cache not initialized") } -pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> { +pub fn set_cache(cache: Arc<RrdCache>) -> Result<(), Error> { RRD_CACHE .set(cache) .map_err(|_| format_err!("RRD cache already initialized!"))?; @@ -44,107 +44,106 @@ pub fn set_cache(cache: Arc<Cache>) -> Result<(), Error> { Ok(()) } -/// Initialize the RRD cache instance -/// -/// Note: Only a single process must do this (proxmox-datacenter-api) -pub fn init<P: AsRef<Path>>( - base_path: P, - dir_options: CreateOptions, - file_options: CreateOptions, -) -> Result<Arc<Cache>, Error> { - let apply_interval = 30.0 * 60.0; // 30 minutes - - let cache = Cache::new( - base_path, - Some(file_options), - Some(dir_options), - apply_interval, - load_callback, - create_callback, - )?; - - cache.apply_journal()?; - - Ok(Arc::new(cache)) +/// Wrapper for proxmox_rrd::Cache to accomodate helper methods. +pub struct RrdCache { + cache: Cache, } -fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> { - match Database::load(path, true) { - Ok(rrd) => Some(rrd), - Err(err) => { - if err.kind() != std::io::ErrorKind::NotFound { - log::warn!("overwriting RRD file {path:?}, because of load error: {err}",); +impl RrdCache { + /// Create a new RrdCache instance + pub fn new<P: AsRef<Path>>( + base_path: P, + dir_options: CreateOptions, + file_options: CreateOptions, + ) -> Result<Self, Error> { + let apply_interval = 30.0 * 60.0; // 30 minutes + + let cache = Cache::new( + base_path, + Some(file_options), + Some(dir_options), + apply_interval, + Self::load_callback, + Self::create_callback, + )?; + + cache.apply_journal()?; + + Ok(Self { cache }) + } + + fn load_callback(path: &Path, _rel_path: &str) -> Option<Database> { + match Database::load(path, true) { + Ok(rrd) => Some(rrd), + Err(err) => { + if err.kind() != std::io::ErrorKind::NotFound { + log::warn!("overwriting RRD file {path:?}, because of load error: {err}",); + } + None } - None } } -} - -fn create_callback(dst: DataSourceType) -> Database { - let rra_list = vec![ - // 1 min * 1440 => 1 day - Archive::new(AggregationFn::Average, 60, 1440), - Archive::new(AggregationFn::Maximum, 60, 1440), - // 30 min * 1440 => 30 days ~ 1 month - Archive::new(AggregationFn::Average, 30 * 60, 1440), - Archive::new(AggregationFn::Maximum, 30 * 60, 1440), - // 6 h * 1440 => 360 days ~ 1 year - Archive::new(AggregationFn::Average, 6 * 3600, 1440), - Archive::new(AggregationFn::Maximum, 6 * 3600, 1440), - // 1 week * 570 => 10 years - Archive::new(AggregationFn::Average, 7 * 86400, 570), - Archive::new(AggregationFn::Maximum, 7 * 86400, 570), - ]; - - Database::new(dst, rra_list) -} -/// Extracts data for the specified time frame from RRD cache -pub fn extract_data( - rrd_cache: &Cache, - basedir: &str, - name: &str, - timeframe: RrdTimeframe, - mode: RrdMode, -) -> Result<Option<proxmox_rrd::Entry>, Error> { - let end = proxmox_time::epoch_f64() as u64; - - let (start, resolution) = match timeframe { - RrdTimeframe::Hour => (end - 3600, 60), - RrdTimeframe::Day => (end - 3600 * 24, 60), - RrdTimeframe::Week => (end - 3600 * 24 * 7, 30 * 60), - RrdTimeframe::Month => (end - 3600 * 24 * 30, 30 * 60), - RrdTimeframe::Year => (end - 3600 * 24 * 365, 6 * 60 * 60), - RrdTimeframe::Decade => (end - 10 * 3600 * 24 * 366, 7 * 86400), - }; - - let cf = match mode { - RrdMode::Max => AggregationFn::Maximum, - RrdMode::Average => AggregationFn::Average, - }; - - rrd_cache.extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end)) -} + fn create_callback(dst: DataSourceType) -> Database { + let rra_list = vec![ + // 1 min * 1440 => 1 day + Archive::new(AggregationFn::Average, 60, 1440), + Archive::new(AggregationFn::Maximum, 60, 1440), + // 30 min * 1440 => 30 days ~ 1 month + Archive::new(AggregationFn::Average, 30 * 60, 1440), + Archive::new(AggregationFn::Maximum, 30 * 60, 1440), + // 6 h * 1440 => 360 days ~ 1 year + Archive::new(AggregationFn::Average, 6 * 3600, 1440), + Archive::new(AggregationFn::Maximum, 6 * 3600, 1440), + // 1 week * 570 => 10 years + Archive::new(AggregationFn::Average, 7 * 86400, 570), + Archive::new(AggregationFn::Maximum, 7 * 86400, 570), + ]; + + Database::new(dst, rra_list) + } -/// Sync/Flush the RRD journal -pub fn sync_journal() { - let rrd_cache = get_cache(); - if let Err(err) = rrd_cache.sync_journal() { - log::error!("rrd_sync_journal failed - {err}"); + /// Extracts data for the specified time frame from RRD cache + pub fn extract_data( + &self, + basedir: &str, + name: &str, + timeframe: RrdTimeframe, + mode: RrdMode, + ) -> Result<Option<proxmox_rrd::Entry>, Error> { + let end = proxmox_time::epoch_f64() as u64; + + let (start, resolution) = match timeframe { + RrdTimeframe::Hour => (end - 3600, 60), + RrdTimeframe::Day => (end - 3600 * 24, 60), + RrdTimeframe::Week => (end - 3600 * 24 * 7, 30 * 60), + RrdTimeframe::Month => (end - 3600 * 24 * 30, 30 * 60), + RrdTimeframe::Year => (end - 3600 * 24 * 365, 6 * 60 * 60), + RrdTimeframe::Decade => (end - 10 * 3600 * 24 * 366, 7 * 86400), + }; + + let cf = match mode { + RrdMode::Max => AggregationFn::Maximum, + RrdMode::Average => AggregationFn::Average, + }; + + self.cache + .extract_cached_data(basedir, name, cf, resolution, Some(start), Some(end)) } -} -/// Update RRD Gauge values -pub fn update_value( - rrd_cache: &Cache, - name: &str, - value: f64, - timestamp: i64, - datasource_type: DataSourceType, -) { - if let Err(err) = - rrd_cache.update_value_ignore_old(name, timestamp as f64, value, datasource_type) - { - log::error!("rrd::update_value '{name}' failed - {err}"); + /// Update RRD Gauge values + pub fn update_value( + &self, + name: &str, + value: f64, + timestamp: i64, + datasource_type: DataSourceType, + ) { + if let Err(err) = + self.cache + .update_value_ignore_old(name, timestamp as f64, value, datasource_type) + { + log::error!("rrd::update_value '{name}' failed - {err}"); + } } } diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs index 27327a29..c2a41d30 100644 --- a/server/src/metric_collection/rrd_task.rs +++ b/server/src/metric_collection/rrd_task.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use anyhow::Error; use tokio::sync::{mpsc::Receiver, oneshot}; -use proxmox_rrd::{rrd::DataSourceType, Cache}; +use proxmox_rrd::rrd::DataSourceType; use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; -use super::rrd_cache; +use super::rrd_cache::RrdCache; /// Store request for the RRD task. pub(super) enum RrdStoreRequest { @@ -41,7 +41,7 @@ pub(super) struct RrdStoreResult { /// Task which stores received metrics in the RRD. Metric data is fed into /// this task via a MPSC channel. pub(super) async fn store_in_rrd_task( - cache: Arc<Cache>, + cache: Arc<RrdCache>, mut receiver: Receiver<RrdStoreRequest>, ) -> Result<(), Error> { while let Some(msg) = receiver.recv().await { @@ -95,7 +95,7 @@ pub(super) async fn store_in_rrd_task( Ok(()) } -fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetricsData) { +fn store_metric_pve(cache: &RrdCache, remote_name: &str, data_point: &ClusterMetricsData) { let name = format!( "pve/{remote_name}/{id}/{metric}", id = data_point.id, @@ -108,8 +108,7 @@ fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetric ClusterMetricsDataType::Derive => DataSourceType::Derive, }; - rrd_cache::update_value( - cache, + cache.update_value( &name, data_point.value, data_point.timestamp, @@ -117,7 +116,7 @@ fn store_metric_pve(cache: &Cache, remote_name: &str, data_point: &ClusterMetric ); } -fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoint) { +fn store_metric_pbs(cache: &RrdCache, remote_name: &str, data_point: &MetricDataPoint) { let name = format!( "pbs/{remote_name}/{id}/{metric}", id = data_point.id, @@ -130,8 +129,7 @@ fn store_metric_pbs(cache: &Cache, remote_name: &str, data_point: &MetricDataPoi MetricDataType::Derive => DataSourceType::Derive, }; - rrd_cache::update_value( - cache, + cache.update_value( &name, data_point.value, data_point.timestamp, @@ -156,7 +154,7 @@ mod tests { // Arrange let dir = NamedTempDir::new()?; let options = get_create_options().perm(nix::sys::stat::Mode::from_bits_truncate(0o700)); - let cache = rrd_cache::init(&dir.path(), options.clone(), options.clone())?; + let cache = Arc::new(RrdCache::new(dir.path(), options.clone(), options.clone())?); let (tx, rx) = tokio::sync::mpsc::channel(10); let task = store_in_rrd_task(Arc::clone(&cache), rx); @@ -215,8 +213,7 @@ mod tests { // There is some race condition in proxmox_rrd, in some rare cases // extract_data does not return any data directly after writing. - if let Some(data) = rrd_cache::extract_data( - &cache, + if let Some(data) = cache.extract_data( "pve/some-remote/node/some-node", "cpu_current", RrdTimeframe::Hour, diff --git a/server/src/metric_collection/top_entities.rs b/server/src/metric_collection/top_entities.rs index f54ee72f..31e36c34 100644 --- a/server/src/metric_collection/top_entities.rs +++ b/server/src/metric_collection/top_entities.rs @@ -123,8 +123,7 @@ fn get_entity( ) -> Option<(f64, TopEntity)> { let cache = rrd_cache::get_cache(); - if let Ok(Some(values)) = rrd_cache::extract_data( - &cache, + if let Ok(Some(values)) = cache.extract_data( &name, metric, timeframe, -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel