From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 9859D1FF13C for ; Thu, 19 Mar 2026 10:42:03 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0035316811; Thu, 19 Mar 2026 10:42:01 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox v6 09/12] s3-client: implement request counter threshold and exceeding callback Date: Thu, 19 Mar 2026 10:40:35 +0100 Message-ID: <20260319094100.240765-10-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260319094100.240765-1-c.ebner@proxmox.com> References: <20260319094100.240765-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773913231230 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.060 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 5TLDUT7VONTLHVRHT4L5S2Q2HD4JPCMH X-Message-ID-Hash: 5TLDUT7VONTLHVRHT4L5S2Q2HD4JPCMH X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Allows to set the threshold values to trigger exceeding callbacks for the s3 request counters. The globally settable callback function is executed whenever one of the request counters is exceeding it's set threshold value, but only once when passing over the threshold. Since the same shared callback is used for all counters, allow to set and propagate a label identifying the shared request counter calling the callback. Signed-off-by: Christian Ebner --- .../src/shared_request_counters.rs | 153 ++++++++++++++++-- 1 file changed, 139 insertions(+), 14 deletions(-) diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs index 2b5a0843..3a6e5a03 100644 --- a/proxmox-s3-client/src/shared_request_counters.rs +++ b/proxmox-s3-client/src/shared_request_counters.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::mem::MaybeUninit; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, LazyLock, RwLock}; use std::time::Duration; use anyhow::{bail, Error}; @@ -18,10 +18,20 @@ use tokio::time::Instant; use proxmox_shared_memory::{Init, SharedMemory}; use proxmox_sys::fs::CreateOptions; +use crate::api_types::RequestCounterThresholds; + const MEMORY_PAGE_SIZE: usize = 4096; /// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8] const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5]; +/// Callback method triggered when exceeding counter thresholds. +/// Callback is called with the following parameters: common-prefix, threshold name, threshold limit, +/// value exceeding limit. +pub type ThresholdExceededCallback = Box; +static SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK: LazyLock< + RwLock>, +> = LazyLock::new(|| RwLock::new(None)); + #[repr(C, align(32))] #[derive(Default)] /// AtomicU64 aligned to the half default cache line size of 64-bytes. @@ -48,6 +58,16 @@ struct RequestCounters { // traffic in bytes upload: AlignedAtomic, download: AlignedAtomic, + + /// Request counter thresholds + get_threshold: AlignedAtomic, + delete_threshold: AlignedAtomic, + put_threshold: AlignedAtomic, + head_threshold: AlignedAtomic, + post_threshold: AlignedAtomic, + // Traffic counter thresholds + upload_threshold: AlignedAtomic, + download_threshold: AlignedAtomic, } impl Init for RequestCounters { @@ -61,17 +81,51 @@ impl RequestCounters { /// Increment the counter for given method, following the provided memory ordering constrains. /// /// Returns the previously stored value. - pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { + pub fn increment(&self, cb_label: &str, method: Method, ordering: Ordering) -> u64 { match method { - Method::DELETE => self.delete.0.fetch_add(1, ordering), - Method::GET => self.get.0.fetch_add(1, ordering), - Method::HEAD => self.head.0.fetch_add(1, ordering), - Method::POST => self.post.0.fetch_add(1, ordering), - Method::PUT => self.put.0.fetch_add(1, ordering), + Method::DELETE => { + let prev = self.delete.0.fetch_add(1, ordering); + let threshold = self.delete_threshold.0.load(Ordering::Acquire); + Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1); + prev + } + Method::GET => { + let prev = self.get.0.fetch_add(1, ordering); + let threshold = self.get_threshold.0.load(Ordering::Acquire); + Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1); + prev + } + Method::HEAD => { + let prev = self.head.0.fetch_add(1, ordering); + let threshold = self.head_threshold.0.load(Ordering::Acquire); + Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1); + prev + } + Method::POST => { + let prev = self.post.0.fetch_add(1, ordering); + let threshold = self.post_threshold.0.load(Ordering::Acquire); + Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1); + prev + } + Method::PUT => { + let prev = self.put.0.fetch_add(1, ordering); + let threshold = self.put_threshold.0.load(Ordering::Acquire); + Self::check_threshold(cb_label, method.as_str(), threshold, prev + 1); + prev + } _ => 0, } } + fn check_threshold(cb_label: &str, counter_id: &str, threshold: u64, current: u64) { + if threshold > 0 && current > threshold && current - 1 == threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback(cb_label, counter_id, threshold, current); + } + } + } + /// Load current counter state for given method, following the provided memory ordering constrains pub fn load(&self, method: Method, ordering: Ordering) -> u64 { match method { @@ -98,8 +152,17 @@ impl RequestCounters { /// Account for new upload traffic. /// /// Returns the previously stored value. - pub fn add_upload_traffic(&self, count: u64, ordering: Ordering) -> u64 { - self.upload.0.fetch_add(count, ordering) + pub fn add_upload_traffic(&self, cb_label: &str, count: u64, ordering: Ordering) -> u64 { + let prev = self.upload.0.fetch_add(count, ordering); + let threshold = self.upload_threshold.0.load(Ordering::Acquire); + let uploaded = prev + count; + if threshold > 0 && uploaded > threshold && prev <= threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback(cb_label, "uploaded", threshold, uploaded); + } + } + prev } /// Returns upload traffic count. @@ -110,14 +173,52 @@ impl RequestCounters { /// Account for new download traffic. /// /// Returns the previously stored value. - pub fn add_download_traffic(&self, count: u64, ordering: Ordering) -> u64 { - self.download.0.fetch_add(count, ordering) + pub fn add_download_traffic(&self, cb_label: &str, count: u64, ordering: Ordering) -> u64 { + let prev = self.download.0.fetch_add(count, ordering); + let threshold = self.download_threshold.0.load(Ordering::Acquire); + let downloaded = prev + count; + if threshold > 0 && downloaded > threshold && prev <= threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback(cb_label, "downloaded", threshold, downloaded); + } + } + prev } /// Returns download traffic count. pub fn get_download_traffic(&self, ordering: Ordering) -> u64 { self.download.0.load(ordering) } + + /// Update the request threshold values. + pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) { + self.delete_threshold + .0 + .store(thresholds.s3_delete.unwrap_or(0), Ordering::Release); + self.get_threshold + .0 + .store(thresholds.s3_get.unwrap_or(0), Ordering::Release); + self.head_threshold + .0 + .store(thresholds.s3_head.unwrap_or(0), Ordering::Release); + self.post_threshold + .0 + .store(thresholds.s3_post.unwrap_or(0), Ordering::Release); + self.put_threshold + .0 + .store(thresholds.s3_put.unwrap_or(0), Ordering::Release); + let download = thresholds + .s3_download + .map(|human_byte| human_byte.as_u64()) + .unwrap_or(0); + self.download_threshold.0.store(download, Ordering::Release); + let upload = thresholds + .s3_upload + .map(|human_byte| human_byte.as_u64()) + .unwrap_or(0); + self.upload_threshold.0.store(upload, Ordering::Release); + } } /// Size of the padding to align the mmapped request counters to 4k default @@ -172,6 +273,7 @@ impl Init for MappableRequestCounters { /// /// If set, the counts can be filtered based on a path prefix. pub struct SharedRequestCounters { + cb_label: String, shared_memory: SharedMemory, path: PathBuf, } @@ -197,6 +299,7 @@ impl SharedRequestCounters { .group(user.gid); let shared_memory = SharedMemory::open_non_tmpfs(&path, file_opts)?; Ok(Self { + cb_label: String::new(), shared_memory, path, }) @@ -209,7 +312,7 @@ impl SharedRequestCounters { self.shared_memory .data() .counters - .increment(method, ordering) + .increment(&self.cb_label, method, ordering) } /// Load current counter state for given method, following the provided memory ordering constrains @@ -229,7 +332,7 @@ impl SharedRequestCounters { self.shared_memory .data() .counters - .add_upload_traffic(count, ordering) + .add_upload_traffic(&self.cb_label, count, ordering) } /// Returns upload traffic count. @@ -247,7 +350,7 @@ impl SharedRequestCounters { self.shared_memory .data() .counters - .add_download_traffic(count, ordering) + .add_download_traffic(&self.cb_label, count, ordering) } /// Returns download traffic count. @@ -272,6 +375,28 @@ impl SharedRequestCounters { pub fn path_buf(&self) -> PathBuf { self.path.clone() } + + /// Update the callback and the label to identify the counter executing it + /// when one of the set thresholds is exceeded. + pub fn set_thresholds_exceeded_callback( + &mut self, + cb_label: String, + callback: ThresholdExceededCallback, + ) { + self.cb_label = cb_label; + SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK + .write() + .unwrap() + .replace(callback); + } + + /// Update the request counter thresholds to given values. + pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) { + self.shared_memory + .data() + .counters + .update_thresholds(thresholds); + } } const FLUSH_THRESHOLD: Duration = Duration::from_secs(5); -- 2.47.3