From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pdm-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 363511FF172 for <inbox@lore.proxmox.com>; Wed, 16 Apr 2025 14:57:35 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 714273731B; Wed, 16 Apr 2025 14:57:31 +0200 (CEST) From: Lukas Wagner <l.wagner@proxmox.com> To: pdm-devel@lists.proxmox.com Date: Wed, 16 Apr 2025 14:56:22 +0200 Message-Id: <20250416125642.291552-7-l.wagner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250416125642.291552-1-l.wagner@proxmox.com> References: <20250416125642.291552-1-l.wagner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v3 06/26] metric collection: persist state after metric collection X-BeenThere: pdm-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Datacenter Manager development discussion <pdm-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pdm-devel/> List-Post: <mailto:pdm-devel@lists.proxmox.com> List-Help: <mailto:pdm-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel>, <mailto:pdm-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Datacenter Manager development discussion <pdm-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pdm-devel-bounces@lists.proxmox.com Sender: "pdm-devel" <pdm-devel-bounces@lists.proxmox.com> The metric collection has to maintain some state, at the moment this is only the most recent timestamp from all received metric data points. We use this as a cut-off time when requesting metric from the remote, as everything *older* than that should already be stored in the database. Up until now, the state was only stored in memory, which means that it was lost when the daemon restarted. The changes in this commit makes the metric collection system load/save the state from a file in /var/lib/proxmox-datacenter-manager/metric-collection-state.json The following data points are stored for every remote: - most-recent-datapoint: Timestamp of the most recent datapoint, used as a cut-off when fetching new metrics - last-collection: `last-collection` field saves the timestamp of the last *successful* metric collection for a remote as local timestamp. - error: String representation of any error that occured in the last collection attempt Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> Reviewed-by: Maximiliano Sandoval <m.sandoval@proxmox.com> --- Notes: Changes since v1: - use .inspect_err(..).unwrap_or_default() instead of a manual match in `MetricCollectionState::new` .../src/metric_collection/collection_task.rs | 130 ++++++++++++------ server/src/metric_collection/mod.rs | 1 + server/src/metric_collection/rrd_task.rs | 60 ++++++-- server/src/metric_collection/state.rs | 126 +++++++++++++++++ 4 files changed, 267 insertions(+), 50 deletions(-) create mode 100644 server/src/metric_collection/state.rs diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs index 093f2692..ce3d650d 100644 --- a/server/src/metric_collection/collection_task.rs +++ b/server/src/metric_collection/collection_task.rs @@ -1,15 +1,16 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use anyhow::Error; use tokio::{ sync::{ mpsc::{Receiver, Sender}, - OwnedSemaphorePermit, Semaphore, + oneshot, OwnedSemaphorePermit, Semaphore, }, time::Interval, }; use proxmox_section_config::typed::SectionConfigData; +use proxmox_sys::fs::CreateOptions; use pdm_api_types::{ remotes::{Remote, RemoteType}, @@ -19,7 +20,16 @@ use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE; use crate::{connection, task_utils}; -use super::rrd_task::RrdStoreRequest; +use super::{ + rrd_task::{RrdStoreRequest, RrdStoreResult}, + state::{MetricCollectionState, RemoteStatus}, +}; + +/// Location of the metric collection state file. +const METRIC_COLLECTION_STATE_FILE: &str = concat!( + pdm_buildcfg::PDM_STATE_DIR_M!(), + "/metric-collection-state.json" +); pub const MAX_CONCURRENT_CONNECTIONS: usize = 20; @@ -32,7 +42,7 @@ pub(super) enum ControlMsg { /// Task which periodically collects metrics from all remotes and stores /// them in the local metrics database. pub(super) struct MetricCollectionTask { - most_recent_timestamps: HashMap<String, i64>, + state: MetricCollectionState, settings: CollectionSettings, metric_data_tx: Sender<RrdStoreRequest>, control_message_rx: Receiver<ControlMsg>, @@ -45,9 +55,10 @@ impl MetricCollectionTask { control_message_rx: Receiver<ControlMsg>, ) -> Result<Self, Error> { let settings = Self::get_settings_or_default(); + let state = load_state()?; Ok(Self { - most_recent_timestamps: HashMap::new(), + state, settings, metric_data_tx, control_message_rx, @@ -111,6 +122,10 @@ impl MetricCollectionTask { ); timer = Self::setup_timer(interval); } + + if let Err(err) = self.state.save() { + log::error!("could not update metric collection state: {err}"); + } } } @@ -179,7 +194,11 @@ impl MetricCollectionTask { let mut handles = Vec::new(); for remote_name in remotes_to_fetch { - let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0); + let status = self + .state + .get_status(remote_name) + .cloned() + .unwrap_or_default(); // unwrap is okay here, acquire_* will only fail if `close` has been // called on the semaphore. @@ -189,7 +208,7 @@ impl MetricCollectionTask { log::debug!("fetching remote '{}'", remote.id); let handle = tokio::spawn(Self::fetch_single_remote( remote, - start_time, + status, self.metric_data_tx.clone(), permit, )); @@ -203,8 +222,7 @@ impl MetricCollectionTask { match res { Ok(Ok(ts)) => { - self.most_recent_timestamps - .insert(remote_name.to_string(), ts); + self.state.set_status(remote_name, ts); } Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"), Err(err) => { @@ -220,45 +238,79 @@ impl MetricCollectionTask { #[tracing::instrument(skip_all, fields(remote = remote.id), name = "metric_collection_task")] async fn fetch_single_remote( remote: Remote, - start_time: i64, + mut status: RemoteStatus, sender: Sender<RrdStoreRequest>, _permit: OwnedSemaphorePermit, - ) -> Result<i64, Error> { - let most_recent_timestamp = match remote.ty { - RemoteType::Pve => { - let client = connection::make_pve_client(&remote)?; - let metrics = client - .cluster_metrics_export(Some(true), Some(false), Some(start_time)) - .await?; + ) -> Result<RemoteStatus, Error> { + let (result_tx, result_rx) = oneshot::channel(); - let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); + let now = proxmox_time::epoch_i64(); - sender - .send(RrdStoreRequest::Pve { - remote: remote.id.clone(), - metrics, - }) - .await?; + let res: Result<RrdStoreResult, Error> = async { + match remote.ty { + RemoteType::Pve => { + let client = connection::make_pve_client(&remote)?; + let metrics = client + .cluster_metrics_export( + Some(true), + Some(false), + Some(status.most_recent_datapoint), + ) + .await?; - most_recent + sender + .send(RrdStoreRequest::Pve { + remote: remote.id.clone(), + metrics, + channel: result_tx, + }) + .await?; + } + RemoteType::Pbs => { + let client = connection::make_pbs_client(&remote)?; + let metrics = client + .metrics(Some(true), Some(status.most_recent_datapoint)) + .await?; + + sender + .send(RrdStoreRequest::Pbs { + remote: remote.id.clone(), + metrics, + channel: result_tx, + }) + .await?; + } } - RemoteType::Pbs => { - let client = connection::make_pbs_client(&remote)?; - let metrics = client.metrics(Some(true), Some(start_time)).await?; - let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp)); + result_rx.await.map_err(Error::from) + } + .await; - sender - .send(RrdStoreRequest::Pbs { - remote: remote.id.clone(), - metrics, - }) - .await?; - - most_recent + match res { + Ok(result) => { + status.most_recent_datapoint = result.most_recent_timestamp; + status.last_collection = Some(now); + status.error = None; } - }; + Err(err) => { + status.error = Some(err.to_string()); + log::error!("coud not fetch metrics from '{}': {err}", remote.id); + } + } - Ok(most_recent_timestamp) + Ok(status) } } + +/// Load the metric collection state file. +pub(super) fn load_state() -> Result<MetricCollectionState, Error> { + let api_uid = pdm_config::api_user()?.uid; + let api_gid = pdm_config::api_group()?.gid; + + let file_options = CreateOptions::new().owner(api_uid).group(api_gid); + + Ok(MetricCollectionState::new( + METRIC_COLLECTION_STATE_FILE.into(), + file_options, + )) +} diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs index 9b203615..9cd60455 100644 --- a/server/src/metric_collection/mod.rs +++ b/server/src/metric_collection/mod.rs @@ -7,6 +7,7 @@ use tokio::sync::mpsc::{self, Sender}; mod collection_task; pub mod rrd_cache; mod rrd_task; +mod state; pub mod top_entities; use collection_task::{ControlMsg, MetricCollectionTask}; diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs index a72945df..1c618f54 100644 --- a/server/src/metric_collection/rrd_task.rs +++ b/server/src/metric_collection/rrd_task.rs @@ -1,8 +1,10 @@ use anyhow::Error; -use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; +use tokio::sync::{mpsc::Receiver, oneshot}; + use proxmox_rrd::rrd::DataSourceType; + +use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics}; use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType}; -use tokio::sync::mpsc::Receiver; use super::rrd_cache; @@ -14,6 +16,8 @@ pub(super) enum RrdStoreRequest { remote: String, /// Metric data. metrics: ClusterMetrics, + /// Oneshot channel to return the [`RrdStoreResult`]. + channel: oneshot::Sender<RrdStoreResult>, }, /// Store PBS metrics. Pbs { @@ -21,9 +25,17 @@ pub(super) enum RrdStoreRequest { remote: String, /// Metric data. metrics: Metrics, + /// Oneshot channel to return the [`RrdStoreResult`]. + channel: oneshot::Sender<RrdStoreResult>, }, } +/// Result for a [`RrdStoreRequest`]. +pub(super) struct RrdStoreResult { + /// Most recent timestamp of any stored metric datapoint (UNIX epoch). + pub(super) most_recent_timestamp: i64, +} + /// Task which stores received metrics in the RRD. Metric data is fed into /// this task via a MPSC channel. pub(super) async fn store_in_rrd_task( @@ -31,17 +43,43 @@ pub(super) async fn store_in_rrd_task( ) -> Result<(), Error> { while let Some(msg) = receiver.recv().await { // Involves some blocking file IO - let res = tokio::task::spawn_blocking(move || match msg { - RrdStoreRequest::Pve { remote, metrics } => { - for data_point in metrics.data { - store_metric_pve(&remote, &data_point); + let res = tokio::task::spawn_blocking(move || { + let mut most_recent_timestamp = 0; + let channel = match msg { + RrdStoreRequest::Pve { + remote, + metrics, + channel, + } => { + for data_point in metrics.data { + most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp); + store_metric_pve(&remote, &data_point); + } + + channel } - } - RrdStoreRequest::Pbs { remote, metrics } => { - for data_point in metrics.data { - store_metric_pbs(&remote, &data_point); + RrdStoreRequest::Pbs { + remote, + metrics, + channel, + } => { + for data_point in metrics.data { + most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp); + store_metric_pbs(&remote, &data_point); + } + + channel } - } + }; + + if channel + .send(RrdStoreResult { + most_recent_timestamp, + }) + .is_err() + { + log::error!("could not send RrdStoreStoreResult to metric collection task"); + }; }) .await; diff --git a/server/src/metric_collection/state.rs b/server/src/metric_collection/state.rs new file mode 100644 index 00000000..5b04ea61 --- /dev/null +++ b/server/src/metric_collection/state.rs @@ -0,0 +1,126 @@ +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; + +use anyhow::Error; +use serde::{Deserialize, Serialize}; + +use proxmox_sys::fs::CreateOptions; + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(rename_all = "kebab-case")] +/// Metric collection state file content. +struct State { + remote_status: HashMap<String, RemoteStatus>, +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +#[serde(rename_all = "kebab-case")] +/// A remote's metric collection state. +pub struct RemoteStatus { + /// Most recent datapoint - time stamp is based on remote time + pub most_recent_datapoint: i64, + /// Last successful metric collection - timestamp based on PDM's time + pub last_collection: Option<i64>, + #[serde(skip_serializing_if = "Option::is_none")] + /// Any error that occured during the last metric collection attempt. + pub error: Option<String>, +} + +/// Manage and persist metric collection state. +pub struct MetricCollectionState { + /// Path to the persisted state + path: PathBuf, + /// File owner/perms for the persisted state file + file_options: CreateOptions, + /// The current state + state: State, +} + +impl MetricCollectionState { + /// Initialize state by trying to load the existing statefile. If the file does not exist, + /// state will be empty. If the file failed to load, state will be empty and + /// and error will be logged. + pub fn new(statefile: PathBuf, file_options: CreateOptions) -> Self { + let state = Self::load_or_default(&statefile) + .inspect_err(|err| { + log::error!("could not load metric collection state: {err}"); + }) + .unwrap_or_default(); + + Self { + path: statefile, + file_options, + state, + } + } + + /// Set a remote's status. + pub fn set_status(&mut self, remote: String, remote_state: RemoteStatus) { + self.state.remote_status.insert(remote, remote_state); + } + + /// Get a remote's status. + pub fn get_status(&self, remote: &str) -> Option<&RemoteStatus> { + self.state.remote_status.get(remote) + } + + /// Persist the state to the statefile. + pub fn save(&self) -> Result<(), Error> { + let data = serde_json::to_vec_pretty(&self.state)?; + proxmox_sys::fs::replace_file(&self.path, &data, self.file_options.clone(), true)?; + + Ok(()) + } + + fn load_or_default(path: &Path) -> Result<State, Error> { + let content = proxmox_sys::fs::file_read_optional_string(path)?; + + if let Some(content) = content { + Ok(serde_json::from_str(&content)?) + } else { + Ok(Default::default()) + } + } +} + +#[cfg(test)] +mod tests { + use proxmox_sys::fs::CreateOptions; + + use super::*; + + use crate::test_support::temp::NamedTempFile; + + fn get_options() -> CreateOptions { + CreateOptions::new() + .owner(nix::unistd::Uid::effective()) + .group(nix::unistd::Gid::effective()) + .perm(nix::sys::stat::Mode::from_bits_truncate(0o600)) + } + + #[test] + fn save_and_load() -> Result<(), Error> { + let file = NamedTempFile::new(get_options())?; + let options = get_options(); + let mut state = MetricCollectionState::new(file.path().into(), options.clone()); + + state.set_status( + "some-remote".into(), + RemoteStatus { + most_recent_datapoint: 1234, + ..Default::default() + }, + ); + + state.save()?; + + let state = MetricCollectionState::new(file.path().into(), options); + + let status = state.get_status("some-remote").unwrap(); + assert_eq!(status.most_recent_datapoint, 1234); + + Ok(()) + } +} -- 2.39.5 _______________________________________________ pdm-devel mailing list pdm-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel