public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox v7 08/34] s3-client: implement request counter threshold and exceeding callback
Date: Wed,  1 Apr 2026 15:47:51 +0200	[thread overview]
Message-ID: <20260401134817.926499-9-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260401134817.926499-1-c.ebner@proxmox.com>

Allows to set the threshold values to trigger exceeding callbacks
for the s3 request counters.

The globally settable callback function is executed whenever one of
the request counters is exceeding it's set threshold value, but
only once when passing over the threshold.

Since the same shared callback is used for all counters, allow to
set and propagate a label identifying the shared request counter
calling the callback.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
Reviewed-by: Hannes Laimer <h.laimer@proxmox.com>
Tested-by: Hannes Laimer <h.laimer@proxmox.com>
---
 .../src/shared_request_counters.rs            | 153 ++++++++++++++++--
 1 file changed, 139 insertions(+), 14 deletions(-)

diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs
index 58f30a91..9c8b2fd4 100644
--- a/proxmox-s3-client/src/shared_request_counters.rs
+++ b/proxmox-s3-client/src/shared_request_counters.rs
@@ -2,7 +2,7 @@ use std::collections::HashMap;
 use std::mem::MaybeUninit;
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, LazyLock, RwLock};
 use std::time::Duration;
 
 use anyhow::{bail, Error};
@@ -18,10 +18,20 @@ use tokio::time::Instant;
 use proxmox_shared_memory::{Init, SharedMemory};
 use proxmox_sys::fs::CreateOptions;
 
+use crate::api_types::RequestCounterThresholds;
+
 const MEMORY_PAGE_SIZE: usize = 4096;
 /// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8]
 const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5];
 
+/// Callback method triggered when exceeding counter thresholds.
+/// Callback is called with the following parameters: common-prefix, threshold name, threshold limit,
+/// value exceeding limit.
+pub type ThresholdExceededCallback = Box<dyn Fn(&str, &str, u64, u64) + Send + Sync + 'static>;
+static SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK: LazyLock<
+    RwLock<Option<ThresholdExceededCallback>>,
+> = LazyLock::new(|| RwLock::new(None));
+
 #[repr(C, align(32))]
 #[derive(Default)]
 /// AtomicU64 aligned to the half default cache line size of 64-bytes.
@@ -48,6 +58,16 @@ struct RequestCounters {
     // traffic in bytes
     upload: AlignedAtomic,
     download: AlignedAtomic,
+
+    /// Request counter thresholds
+    get_threshold: AlignedAtomic,
+    delete_threshold: AlignedAtomic,
+    put_threshold: AlignedAtomic,
+    head_threshold: AlignedAtomic,
+    post_threshold: AlignedAtomic,
+    // Traffic counter thresholds
+    upload_threshold: AlignedAtomic,
+    download_threshold: AlignedAtomic,
 }
 
 impl Init for RequestCounters {
@@ -61,17 +81,51 @@ impl RequestCounters {
     /// Increment the counter for given method, following the provided memory ordering constrains.
     ///
     /// Returns the previously stored value.
-    pub fn increment(&self, method: Method, ordering: Ordering) -> u64 {
+    pub fn increment(&self, cb_label: &str, method: Method, ordering: Ordering) -> u64 {
         match method {
-            Method::DELETE => self.delete.0.fetch_add(1, ordering),
-            Method::GET => self.get.0.fetch_add(1, ordering),
-            Method::HEAD => self.head.0.fetch_add(1, ordering),
-            Method::POST => self.post.0.fetch_add(1, ordering),
-            Method::PUT => self.put.0.fetch_add(1, ordering),
+            Method::DELETE => {
+                let prev = self.delete.0.fetch_add(1, ordering);
+                let threshold = self.delete_threshold.0.load(Ordering::Acquire);
+                Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1);
+                prev
+            }
+            Method::GET => {
+                let prev = self.get.0.fetch_add(1, ordering);
+                let threshold = self.get_threshold.0.load(Ordering::Acquire);
+                Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1);
+                prev
+            }
+            Method::HEAD => {
+                let prev = self.head.0.fetch_add(1, ordering);
+                let threshold = self.head_threshold.0.load(Ordering::Acquire);
+                Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1);
+                prev
+            }
+            Method::POST => {
+                let prev = self.post.0.fetch_add(1, ordering);
+                let threshold = self.post_threshold.0.load(Ordering::Acquire);
+                Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1);
+                prev
+            }
+            Method::PUT => {
+                let prev = self.put.0.fetch_add(1, ordering);
+                let threshold = self.put_threshold.0.load(Ordering::Acquire);
+                Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1);
+                prev
+            }
             _ => 0,
         }
     }
 
+    fn check_threshold(cb_label: &str, counter_id: &str, threshold: u64, current: u64) {
+        if threshold > 0 && current > threshold && current - 1 == threshold {
+            let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap();
+            if let Some(callback) = guard.as_ref() {
+                callback(cb_label, counter_id, threshold, current);
+            }
+        }
+    }
+
     /// Load current counter state for given method, following the provided memory ordering constrains
     pub fn load(&self, method: Method, ordering: Ordering) -> u64 {
         match method {
@@ -102,8 +156,17 @@ impl RequestCounters {
     /// Account for new upload traffic.
     ///
     /// Returns the previously stored value.
-    pub fn add_upload_traffic(&self, count: u64, ordering: Ordering) -> u64 {
-        self.upload.0.fetch_add(count, ordering)
+    pub fn add_upload_traffic(&self, cb_label: &str, count: u64, ordering: Ordering) -> u64 {
+        let prev = self.upload.0.fetch_add(count, ordering);
+        let threshold = self.upload_threshold.0.load(Ordering::Acquire);
+        let uploaded = prev + count;
+        if threshold > 0 && uploaded > threshold && prev <= threshold {
+            let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap();
+            if let Some(callback) = guard.as_ref() {
+                callback(cb_label, "uploaded", threshold, uploaded);
+            }
+        }
+        prev
     }
 
     /// Returns upload traffic count.
@@ -114,14 +177,52 @@ impl RequestCounters {
     /// Account for new download traffic.
     ///
     /// Returns the previously stored value.
-    pub fn add_download_traffic(&self, count: u64, ordering: Ordering) -> u64 {
-        self.download.0.fetch_add(count, ordering)
+    pub fn add_download_traffic(&self, cb_label: &str, count: u64, ordering: Ordering) -> u64 {
+        let prev = self.download.0.fetch_add(count, ordering);
+        let threshold = self.download_threshold.0.load(Ordering::Acquire);
+        let downloaded = prev + count;
+        if threshold > 0 && downloaded > threshold && prev <= threshold {
+            let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap();
+            if let Some(callback) = guard.as_ref() {
+                callback(cb_label, "downloaded", threshold, downloaded);
+            }
+        }
+        prev
     }
 
     /// Returns download traffic count.
     pub fn get_download_traffic(&self, ordering: Ordering) -> u64 {
         self.download.0.load(ordering)
     }
+
+    /// Update the request threshold values.
+    pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) {
+        self.delete_threshold
+            .0
+            .store(thresholds.s3_delete.unwrap_or(0), Ordering::Release);
+        self.get_threshold
+            .0
+            .store(thresholds.s3_get.unwrap_or(0), Ordering::Release);
+        self.head_threshold
+            .0
+            .store(thresholds.s3_head.unwrap_or(0), Ordering::Release);
+        self.post_threshold
+            .0
+            .store(thresholds.s3_post.unwrap_or(0), Ordering::Release);
+        self.put_threshold
+            .0
+            .store(thresholds.s3_put.unwrap_or(0), Ordering::Release);
+        let download = thresholds
+            .s3_download
+            .map(|human_byte| human_byte.as_u64())
+            .unwrap_or(0);
+        self.download_threshold.0.store(download, Ordering::Release);
+        let upload = thresholds
+            .s3_upload
+            .map(|human_byte| human_byte.as_u64())
+            .unwrap_or(0);
+        self.upload_threshold.0.store(upload, Ordering::Release);
+    }
 }
 
 /// Size of the padding to align the mmapped request counters to 4k default
@@ -176,6 +277,7 @@ impl Init for MappableRequestCounters {
 ///
 /// If set, the counts can be filtered based on a path prefix.
 pub struct SharedRequestCounters {
+    cb_label: String,
     shared_memory: SharedMemory<MappableRequestCounters>,
     path: PathBuf,
 }
@@ -201,6 +303,7 @@ impl SharedRequestCounters {
             .group(user.gid);
         let shared_memory = SharedMemory::open_non_tmpfs(&path, file_opts)?;
         Ok(Self {
+            cb_label: String::new(),
             shared_memory,
             path,
         })
@@ -213,7 +316,7 @@ impl SharedRequestCounters {
         self.shared_memory
             .data()
             .counters
-            .increment(method, ordering)
+            .increment(&self.cb_label, method, ordering)
     }
 
     /// Load current counter state for given method, following the provided memory ordering constrains
@@ -235,7 +338,7 @@ impl SharedRequestCounters {
         self.shared_memory
             .data()
             .counters
-            .add_upload_traffic(count, ordering)
+            .add_upload_traffic(&self.cb_label, count, ordering)
     }
 
     /// Returns upload traffic count.
@@ -253,7 +356,7 @@ impl SharedRequestCounters {
         self.shared_memory
             .data()
             .counters
-            .add_download_traffic(count, ordering)
+            .add_download_traffic(&self.cb_label, count, ordering)
     }
 
     /// Returns download traffic count.
@@ -278,6 +381,28 @@ impl SharedRequestCounters {
     pub fn path_buf(&self) -> PathBuf {
         self.path.clone()
     }
+
+    /// Update the callback and the label to identify the counter executing it
+    /// when one of the set thresholds is exceeded.
+    pub fn set_thresholds_exceeded_callback(
+        &mut self,
+        cb_label: String,
+        callback: ThresholdExceededCallback,
+    ) {
+        self.cb_label = cb_label;
+        SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK
+            .write()
+            .unwrap()
+            .replace(callback);
+    }
+
+    /// Update the request counter thresholds to given values.
+    pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) {
+        self.shared_memory
+            .data()
+            .counters
+            .update_thresholds(thresholds);
+    }
 }
 
 const FLUSH_THRESHOLD: Duration = Duration::from_secs(5);
-- 
2.47.3





  parent reply	other threads:[~2026-04-01 13:48 UTC|newest]

Thread overview: 39+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-01 13:47 [PATCH proxmox{,-backup} v7 00/34] partially fix #6563: add s3 counter for statistics and notifications Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 01/34] s3-client: add persistent shared request counters for client Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 02/34] s3-client: add counters for upload/download traffic Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 03/34] s3-client: account for upload traffic on successful request sending Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 04/34] s3-client: account for downloaded bytes in incoming response body Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 05/34] s3-client: request counters: periodically persist counters to file Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 06/34] s3-client: sync flush request counters on client instance drop Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 07/34] s3-client: api-types: define request counter thresholds Christian Ebner
2026-04-01 13:47 ` Christian Ebner [this message]
2026-04-01 13:47 ` [PATCH proxmox v7 09/34] pbs-api-types: define api type for s3 request statistics Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 10/34] pbs-api-types: add notification thresholds to datastore config Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox v7 11/34] pbs-api-types: add reset schedule for notification threshold counters Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox-backup v7 12/34] metrics: split common module imports into individual use statements Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox-backup v7 13/34] ui: improve variable name indirectly fixing typo Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox-backup v7 14/34] notifications: template data: fix typos in docstrings Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox-backup v7 15/34] datastore: collect request statistics for s3 backed datastores Christian Ebner
2026-04-01 13:47 ` [PATCH proxmox-backup v7 16/34] datastore: expose request counters " Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 17/34] api: s3: add endpoint to reset s3 request counters Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 18/34] bin: s3: expose request counter reset method as cli command Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 19/34] ui: datastore summary: move store to be part of summary panel Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 20/34] ui: expose s3 request counter statistics in the datastore summary Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 21/34] metrics: collect s3 datastore statistics as rrd metrics Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 22/34] api: admin: expose s3 statistics in datastore rrd data Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 23/34] partially fix #6563: ui: expose s3 rrd charts in datastore summary Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 24/34] datastore: refactor datastore lookup parameters into dedicated type Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 25/34] api: config: update notification thresholds for config and counters Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 26/34] ui: utils: add helper to render notification threshold property string Christian Ebner
2026-04-01 22:32   ` Thomas Lamprecht
2026-04-01 13:48 ` [PATCH proxmox-backup v7 27/34] ui: add notification thresholds edit window Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 28/34] notification: define templates and template data for thresholds Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 29/34] datastore: add thresholds notification callback on datastore lookup Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 30/34] api/ui: notifications: add 'thresholds' as notification type value Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 31/34] api: config: allow counter reset schedule editing Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 32/34] ui: expose counter reset schedule edit window Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 33/34] bin: proxy: periodically schedule counter reset task Christian Ebner
2026-04-01 13:48 ` [PATCH proxmox-backup v7 34/34] ui: add task description for scheduled counter reset Christian Ebner
2026-04-01 22:20 ` partially-applied: [PATCH proxmox{,-backup} v7 00/34] partially fix #6563: add s3 counter for statistics and notifications Thomas Lamprecht
2026-04-01 22:44 ` applied: " Thomas Lamprecht
2026-04-02 10:54 ` superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260401134817.926499-9-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal