public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [pdm-devel] [PATCH proxmox-datacenter-manager v2 08/28] metric collection: persist state after metric collection
Date: Fri, 14 Feb 2025 14:06:33 +0100	[thread overview]
Message-ID: <20250214130653.283012-9-l.wagner@proxmox.com> (raw)
In-Reply-To: <20250214130653.283012-1-l.wagner@proxmox.com>

The metric collection has to maintain some state, at the moment this
is only the most recent timestamp from all received metric data points.
We use this as a cut-off time when requesting metric from the remote, as
everything *older* than that should already be stored in the database.

Up until now, the state was only stored in memory, which means that it
was lost when the daemon restarted.

The changes in this commit makes the metric collection system
load/save the state from a file in
/var/lib/proxmox-datacenter-manager/metric-collection-state.json

The following data points are stored for every remote:
- most-recent-datapoint:
  Timestamp of the most recent datapoint, used as a cut-off when
  fetching new metrics
- last-collection:
  `last-collection` field saves the timestamp of the last *successful*
  metric collection for a remote as local timestamp.
- error:
  String representation of any error that occured in the last collection
  attempt

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---

Notes:
    Changes since v1:
      - use .inspect_err(..).unwrap_or_default() instead of a manual match
        in `MetricCollectionState::new`

 .../src/metric_collection/collection_task.rs  | 135 ++++++++++++------
 server/src/metric_collection/mod.rs           |   1 +
 server/src/metric_collection/rrd_task.rs      |  60 ++++++--
 server/src/metric_collection/state.rs         | 126 ++++++++++++++++
 4 files changed, 269 insertions(+), 53 deletions(-)
 create mode 100644 server/src/metric_collection/state.rs

diff --git a/server/src/metric_collection/collection_task.rs b/server/src/metric_collection/collection_task.rs
index b55c8e92..f985bd3e 100644
--- a/server/src/metric_collection/collection_task.rs
+++ b/server/src/metric_collection/collection_task.rs
@@ -1,16 +1,17 @@
-use std::{collections::HashMap, sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
 
 use anyhow::Error;
 use rand::Rng;
 use tokio::{
     sync::{
         mpsc::{Receiver, Sender},
-        OwnedSemaphorePermit, Semaphore,
+        oneshot, OwnedSemaphorePermit, Semaphore,
     },
     time::Interval,
 };
 
 use proxmox_section_config::typed::SectionConfigData;
+use proxmox_sys::fs::CreateOptions;
 
 use pdm_api_types::{
     remotes::{Remote, RemoteType},
@@ -20,7 +21,16 @@ use pdm_config::metric_collection::COLLECTION_SETTINGS_TYPE;
 
 use crate::{connection, task_utils};
 
-use super::rrd_task::RrdStoreRequest;
+use super::{
+    rrd_task::{RrdStoreRequest, RrdStoreResult},
+    state::{MetricCollectionState, RemoteStatus},
+};
+
+/// Location of the metric collection state file.
+const METRIC_COLLECTION_STATE_FILE: &str = concat!(
+    pdm_buildcfg::PDM_STATE_DIR_M!(),
+    "/metric-collection-state.json"
+);
 
 /// Control messages for the metric collection task.
 pub(super) enum ControlMsg {
@@ -31,7 +41,7 @@ pub(super) enum ControlMsg {
 /// Task which periodically collects metrics from all remotes and stores
 /// them in the local metrics database.
 pub(super) struct MetricCollectionTask {
-    most_recent_timestamps: HashMap<String, i64>,
+    state: MetricCollectionState,
     settings: CollectionSettings,
     metric_data_tx: Sender<RrdStoreRequest>,
     control_message_rx: Receiver<ControlMsg>,
@@ -44,9 +54,10 @@ impl MetricCollectionTask {
         control_message_rx: Receiver<ControlMsg>,
     ) -> Result<Self, Error> {
         let settings = Self::get_settings_or_default();
+        let state = load_state()?;
 
         Ok(Self {
-            most_recent_timestamps: HashMap::new(),
+            state,
             settings,
             metric_data_tx,
             control_message_rx,
@@ -115,6 +126,10 @@ impl MetricCollectionTask {
                 );
                 timer = Self::setup_timer(interval);
             }
+
+            if let Err(err) = self.state.save() {
+                log::error!("could not update metric collection state: {err}");
+            }
         }
     }
 
@@ -206,7 +221,11 @@ impl MetricCollectionTask {
         let mut handles = Vec::new();
 
         for remote_name in remotes_to_fetch {
-            let start_time = *self.most_recent_timestamps.get(remote_name).unwrap_or(&0);
+            let status = self
+                .state
+                .get_status(remote_name)
+                .cloned()
+                .unwrap_or_default();
 
             // unwrap is okay here, acquire_* will only fail if `close` has been
             // called on the semaphore.
@@ -217,7 +236,7 @@ impl MetricCollectionTask {
                 let handle = tokio::spawn(Self::fetch_single_remote(
                     self.settings.clone(),
                     remote,
-                    start_time,
+                    status,
                     self.metric_data_tx.clone(),
                     permit,
                 ));
@@ -231,8 +250,7 @@ impl MetricCollectionTask {
 
             match res {
                 Ok(Ok(ts)) => {
-                    self.most_recent_timestamps
-                        .insert(remote_name.to_string(), ts);
+                    self.state.set_status(remote_name, ts);
                 }
                 Ok(Err(err)) => log::error!("failed to collect metrics for {remote_name}: {err}"),
                 Err(err) => {
@@ -249,52 +267,85 @@ impl MetricCollectionTask {
     async fn fetch_single_remote(
         settings: CollectionSettings,
         remote: Remote,
-        start_time: i64,
+        mut status: RemoteStatus,
         sender: Sender<RrdStoreRequest>,
         _permit: OwnedSemaphorePermit,
-    ) -> Result<i64, Error> {
+    ) -> Result<RemoteStatus, Error> {
         Self::sleep_for_random_millis(
             settings.min_connection_delay_or_default(),
             settings.max_connection_delay_or_default(),
             "connection-delay",
         )
         .await;
+        let (result_tx, result_rx) = oneshot::channel();
+
+        let now = proxmox_time::epoch_i64();
+
+        let res: Result<RrdStoreResult, Error> = async {
+            match remote.ty {
+                RemoteType::Pve => {
+                    let client = connection::make_pve_client(&remote)?;
+                    let metrics = client
+                        .cluster_metrics_export(
+                            Some(true),
+                            Some(false),
+                            Some(status.most_recent_datapoint),
+                        )
+                        .await?;
+
+                    sender
+                        .send(RrdStoreRequest::Pve {
+                            remote: remote.id.clone(),
+                            metrics,
+                            channel: result_tx,
+                        })
+                        .await?;
+                }
+                RemoteType::Pbs => {
+                    let client = connection::make_pbs_client(&remote)?;
+                    let metrics = client
+                        .metrics(Some(true), Some(status.most_recent_datapoint))
+                        .await?;
+
+                    sender
+                        .send(RrdStoreRequest::Pbs {
+                            remote: remote.id.clone(),
+                            metrics,
+                            channel: result_tx,
+                        })
+                        .await?;
+                }
+            }
 
-        let most_recent_timestamp = match remote.ty {
-            RemoteType::Pve => {
-                let client = connection::make_pve_client(&remote)?;
-                let metrics = client
-                    .cluster_metrics_export(Some(true), Some(false), Some(start_time))
-                    .await?;
-
-                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
-
-                sender
-                    .send(RrdStoreRequest::Pve {
-                        remote: remote.id.clone(),
-                        metrics,
-                    })
-                    .await?;
+            result_rx.await.map_err(Error::from)
+        }
+        .await;
 
-                most_recent
+        match res {
+            Ok(result) => {
+                status.most_recent_datapoint = result.most_recent_timestamp;
+                status.last_collection = Some(now);
+                status.error = None;
+            }
+            Err(err) => {
+                status.error = Some(err.to_string());
+                log::error!("coud not fetch metrics from '{}': {err}", remote.id);
             }
-            RemoteType::Pbs => {
-                let client = connection::make_pbs_client(&remote)?;
-                let metrics = client.metrics(Some(true), Some(start_time)).await?;
+        }
 
-                let most_recent = metrics.data.iter().fold(0, |acc, x| acc.max(x.timestamp));
+        Ok(status)
+    }
+}
 
-                sender
-                    .send(RrdStoreRequest::Pbs {
-                        remote: remote.id.clone(),
-                        metrics,
-                    })
-                    .await?;
+/// Load the metric collection state file.
+pub(super) fn load_state() -> Result<MetricCollectionState, Error> {
+    let api_uid = pdm_config::api_user()?.uid;
+    let api_gid = pdm_config::api_group()?.gid;
 
-                most_recent
-            }
-        };
+    let file_options = CreateOptions::new().owner(api_uid).group(api_gid);
 
-        Ok(most_recent_timestamp)
-    }
+    Ok(MetricCollectionState::new(
+        METRIC_COLLECTION_STATE_FILE.into(),
+        file_options,
+    ))
 }
diff --git a/server/src/metric_collection/mod.rs b/server/src/metric_collection/mod.rs
index 9b203615..9cd60455 100644
--- a/server/src/metric_collection/mod.rs
+++ b/server/src/metric_collection/mod.rs
@@ -7,6 +7,7 @@ use tokio::sync::mpsc::{self, Sender};
 mod collection_task;
 pub mod rrd_cache;
 mod rrd_task;
+mod state;
 pub mod top_entities;
 
 use collection_task::{ControlMsg, MetricCollectionTask};
diff --git a/server/src/metric_collection/rrd_task.rs b/server/src/metric_collection/rrd_task.rs
index a72945df..1c618f54 100644
--- a/server/src/metric_collection/rrd_task.rs
+++ b/server/src/metric_collection/rrd_task.rs
@@ -1,8 +1,10 @@
 use anyhow::Error;
-use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
+use tokio::sync::{mpsc::Receiver, oneshot};
+
 use proxmox_rrd::rrd::DataSourceType;
+
+use pbs_api_types::{MetricDataPoint, MetricDataType, Metrics};
 use pve_api_types::{ClusterMetrics, ClusterMetricsData, ClusterMetricsDataType};
-use tokio::sync::mpsc::Receiver;
 
 use super::rrd_cache;
 
@@ -14,6 +16,8 @@ pub(super) enum RrdStoreRequest {
         remote: String,
         /// Metric data.
         metrics: ClusterMetrics,
+        /// Oneshot channel to return the [`RrdStoreResult`].
+        channel: oneshot::Sender<RrdStoreResult>,
     },
     /// Store PBS metrics.
     Pbs {
@@ -21,9 +25,17 @@ pub(super) enum RrdStoreRequest {
         remote: String,
         /// Metric data.
         metrics: Metrics,
+        /// Oneshot channel to return the [`RrdStoreResult`].
+        channel: oneshot::Sender<RrdStoreResult>,
     },
 }
 
+/// Result for a [`RrdStoreRequest`].
+pub(super) struct RrdStoreResult {
+    /// Most recent timestamp of any stored metric datapoint (UNIX epoch).
+    pub(super) most_recent_timestamp: i64,
+}
+
 /// Task which stores received metrics in the RRD. Metric data is fed into
 /// this task via a MPSC channel.
 pub(super) async fn store_in_rrd_task(
@@ -31,17 +43,43 @@ pub(super) async fn store_in_rrd_task(
 ) -> Result<(), Error> {
     while let Some(msg) = receiver.recv().await {
         // Involves some blocking file IO
-        let res = tokio::task::spawn_blocking(move || match msg {
-            RrdStoreRequest::Pve { remote, metrics } => {
-                for data_point in metrics.data {
-                    store_metric_pve(&remote, &data_point);
+        let res = tokio::task::spawn_blocking(move || {
+            let mut most_recent_timestamp = 0;
+            let channel = match msg {
+                RrdStoreRequest::Pve {
+                    remote,
+                    metrics,
+                    channel,
+                } => {
+                    for data_point in metrics.data {
+                        most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
+                        store_metric_pve(&remote, &data_point);
+                    }
+
+                    channel
                 }
-            }
-            RrdStoreRequest::Pbs { remote, metrics } => {
-                for data_point in metrics.data {
-                    store_metric_pbs(&remote, &data_point);
+                RrdStoreRequest::Pbs {
+                    remote,
+                    metrics,
+                    channel,
+                } => {
+                    for data_point in metrics.data {
+                        most_recent_timestamp = most_recent_timestamp.max(data_point.timestamp);
+                        store_metric_pbs(&remote, &data_point);
+                    }
+
+                    channel
                 }
-            }
+            };
+
+            if channel
+                .send(RrdStoreResult {
+                    most_recent_timestamp,
+                })
+                .is_err()
+            {
+                log::error!("could not send RrdStoreStoreResult to metric collection task");
+            };
         })
         .await;
 
diff --git a/server/src/metric_collection/state.rs b/server/src/metric_collection/state.rs
new file mode 100644
index 00000000..5b04ea61
--- /dev/null
+++ b/server/src/metric_collection/state.rs
@@ -0,0 +1,126 @@
+use std::{
+    collections::HashMap,
+    path::{Path, PathBuf},
+};
+
+use anyhow::Error;
+use serde::{Deserialize, Serialize};
+
+use proxmox_sys::fs::CreateOptions;
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// Metric collection state file content.
+struct State {
+    remote_status: HashMap<String, RemoteStatus>,
+}
+
+#[derive(Serialize, Deserialize, Debug, Default, Clone)]
+#[serde(rename_all = "kebab-case")]
+/// A remote's metric collection state.
+pub struct RemoteStatus {
+    /// Most recent datapoint - time stamp is based on remote time
+    pub most_recent_datapoint: i64,
+    /// Last successful metric collection - timestamp based on PDM's time
+    pub last_collection: Option<i64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// Any error that occured during the last metric collection attempt.
+    pub error: Option<String>,
+}
+
+/// Manage and persist metric collection state.
+pub struct MetricCollectionState {
+    /// Path to the persisted state
+    path: PathBuf,
+    /// File owner/perms for the persisted state file
+    file_options: CreateOptions,
+    /// The current state
+    state: State,
+}
+
+impl MetricCollectionState {
+    /// Initialize state by trying to load the existing statefile. If the file does not exist,
+    /// state will be empty. If the file  failed to load, state will be empty and
+    /// and error will be logged.
+    pub fn new(statefile: PathBuf, file_options: CreateOptions) -> Self {
+        let state = Self::load_or_default(&statefile)
+            .inspect_err(|err| {
+                log::error!("could not load metric collection state: {err}");
+            })
+            .unwrap_or_default();
+
+        Self {
+            path: statefile,
+            file_options,
+            state,
+        }
+    }
+
+    /// Set a remote's status.
+    pub fn set_status(&mut self, remote: String, remote_state: RemoteStatus) {
+        self.state.remote_status.insert(remote, remote_state);
+    }
+
+    /// Get a remote's status.
+    pub fn get_status(&self, remote: &str) -> Option<&RemoteStatus> {
+        self.state.remote_status.get(remote)
+    }
+
+    /// Persist the state to the statefile.
+    pub fn save(&self) -> Result<(), Error> {
+        let data = serde_json::to_vec_pretty(&self.state)?;
+        proxmox_sys::fs::replace_file(&self.path, &data, self.file_options.clone(), true)?;
+
+        Ok(())
+    }
+
+    fn load_or_default(path: &Path) -> Result<State, Error> {
+        let content = proxmox_sys::fs::file_read_optional_string(path)?;
+
+        if let Some(content) = content {
+            Ok(serde_json::from_str(&content)?)
+        } else {
+            Ok(Default::default())
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use proxmox_sys::fs::CreateOptions;
+
+    use super::*;
+
+    use crate::test_support::temp::NamedTempFile;
+
+    fn get_options() -> CreateOptions {
+        CreateOptions::new()
+            .owner(nix::unistd::Uid::effective())
+            .group(nix::unistd::Gid::effective())
+            .perm(nix::sys::stat::Mode::from_bits_truncate(0o600))
+    }
+
+    #[test]
+    fn save_and_load() -> Result<(), Error> {
+        let file = NamedTempFile::new(get_options())?;
+        let options = get_options();
+        let mut state = MetricCollectionState::new(file.path().into(), options.clone());
+
+        state.set_status(
+            "some-remote".into(),
+            RemoteStatus {
+                most_recent_datapoint: 1234,
+                ..Default::default()
+            },
+        );
+
+        state.save()?;
+
+        let state = MetricCollectionState::new(file.path().into(), options);
+
+        let status = state.get_status("some-remote").unwrap();
+        assert_eq!(status.most_recent_datapoint, 1234);
+
+        Ok(())
+    }
+}
-- 
2.39.5



_______________________________________________
pdm-devel mailing list
pdm-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pdm-devel


  parent reply	other threads:[~2025-02-14 13:08 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-02-14 13:06 [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 01/28] test support: add NamedTempFile helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 02/28] test support: add NamedTempDir helper Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 03/28] pdm-api-types: add CollectionSettings type Lukas Wagner
2025-02-18 15:26   ` Wolfgang Bumiller
2025-02-18 15:31     ` Stefan Hanreich
2025-02-21  8:27     ` Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 04/28] pdm-config: add functions for reading/writing metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 05/28] metric collection: split top_entities split into separate module Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 06/28] metric collection: save metric data to RRD in separate task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 07/28] metric collection: rework metric poll task Lukas Wagner
2025-02-14 13:06 ` Lukas Wagner [this message]
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 09/28] metric collection: skip if last_collection < MIN_COLLECTION_INTERVAL Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 10/28] metric collection: collect overdue metrics on startup/timer change Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 11/28] metric collection: add tests for the fetch_remotes function Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 12/28] metric collection: add test for fetch_overdue Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 13/28] metric collection: pass rrd cache instance as function parameter Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 14/28] metric collection: add test for rrd task Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 15/28] metric collection: wrap rrd_cache::Cache in a struct Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 16/28] metric collection: record remote response time in metric database Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 17/28] metric collection: save time needed for collection run to RRD Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 18/28] metric collection: periodically clean removed remotes from statefile Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 19/28] api: add endpoint for updating metric collection settings Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 20/28] api: add endpoint to trigger metric collection Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 21/28] api: remotes: trigger immediate metric collection for newly added nodes Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 22/28] api: add api for querying metric collection RRD data Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 23/28] api: metric-collection: add status endpoint Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 24/28] pdm-client: add metric collection API methods Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 25/28] cli: add commands for metric-collection settings, trigger, status Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 26/28] metric collection: factor out handle_tick and handle_control_message fns Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 27/28] metric collection: skip missed timer ticks Lukas Wagner
2025-02-14 13:06 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 28/28] metric collection: use JoinSet instead of joining from handles in a Vec Lukas Wagner
2025-02-21 13:19 ` [pdm-devel] [PATCH proxmox-datacenter-manager v2 00/28] metric collection improvements (concurrency, config, API, CLI) Maximiliano Sandoval
2025-03-14 14:10 ` Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250214130653.283012-9-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal