From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6CB7B1FF137 for ; Tue, 03 Mar 2026 16:23:33 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0214F1B5C0; Tue, 3 Mar 2026 16:24:15 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox v4 13/13] s3-client: implement request counter threshold and exceeding callback Date: Tue, 3 Mar 2026 16:22:55 +0100 Message-ID: <20260303152317.934256-14-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260303152317.934256-1-c.ebner@proxmox.com> References: <20260303152317.934256-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1772551383734 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.051 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 4NWDOCGWNAVUIQCBWWCWZVMUQJUOTAGW X-Message-ID-Hash: 4NWDOCGWNAVUIQCBWWCWZVMUQJUOTAGW X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Allows to set the threshold values to trigger exceeding callbacks for the s3 request counters. The globally settable callback function is executed whenever one of the request counters is exceeding it's set threshold value, but only once when passing over the threshold. Signed-off-by: Christian Ebner --- .../src/shared_request_counters.rs | 131 ++++++++++++++++-- 1 file changed, 123 insertions(+), 8 deletions(-) diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs index 0c668183..228da051 100644 --- a/proxmox-s3-client/src/shared_request_counters.rs +++ b/proxmox-s3-client/src/shared_request_counters.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::mem::MaybeUninit; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, LazyLock, RwLock}; use std::time::Duration; use anyhow::{bail, Error}; @@ -18,10 +18,18 @@ use tokio::time::Instant; use proxmox_shared_memory::{Init, SharedMemory}; use proxmox_sys::fs::CreateOptions; +use crate::api_types::RequestCounterThresholds; + const MEMORY_PAGE_SIZE: usize = 4096; /// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8] const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5]; +/// Callback method triggered when exceeding counter thresholds. +pub type ThresholdExceededCallback = Box; +static SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK: LazyLock< + RwLock>, +> = LazyLock::new(|| RwLock::new(None)); + #[repr(C, align(32))] #[derive(Default)] /// AtomicU64 aligned to the half default cache line size of 64-bytes. @@ -48,6 +56,16 @@ struct RequestCounters { // traffic in bytes upload: AlignedAtomic, download: AlignedAtomic, + + /// Request counter thresholds + get_threshold: AlignedAtomic, + delete_threshold: AlignedAtomic, + put_threshold: AlignedAtomic, + head_threshold: AlignedAtomic, + post_threshold: AlignedAtomic, + // Traffic counter thresholds + upload_threshold: AlignedAtomic, + download_threshold: AlignedAtomic, } impl Init for RequestCounters { @@ -63,15 +81,49 @@ impl RequestCounters { /// Returns the previously stored value. pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { match method { - Method::DELETE => self.delete.0.fetch_add(1, ordering), - Method::GET => self.get.0.fetch_add(1, ordering), - Method::HEAD => self.head.0.fetch_add(1, ordering), - Method::POST => self.post.0.fetch_add(1, ordering), - Method::PUT => self.put.0.fetch_add(1, ordering), + Method::DELETE => { + let prev = self.delete.0.fetch_add(1, ordering); + let threshold = self.delete_threshold.0.load(Ordering::Acquire); + Self::check_threshold(method.as_str(), threshold, prev + 1); + prev + } + Method::GET => { + let prev = self.get.0.fetch_add(1, ordering); + let threshold = self.get_threshold.0.load(Ordering::Acquire); + Self::check_threshold(method.as_str(), threshold, prev + 1); + prev + } + Method::HEAD => { + let prev = self.head.0.fetch_add(1, ordering); + let threshold = self.head_threshold.0.load(Ordering::Acquire); + Self::check_threshold(method.as_str(), threshold, prev + 1); + prev + } + Method::POST => { + let prev = self.post.0.fetch_add(1, ordering); + let threshold = self.post_threshold.0.load(Ordering::Acquire); + Self::check_threshold(method.as_str(), threshold, prev + 1); + prev + } + Method::PUT => { + let prev = self.put.0.fetch_add(1, ordering); + let threshold = self.put_threshold.0.load(Ordering::Acquire); + Self::check_threshold(method.as_str(), threshold, prev + 1); + prev + } _ => 0, } } + fn check_threshold(counter_id: &str, threshold: u64, current: u64) { + if threshold > 0 && current > threshold && current - 1 == threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback(counter_id, threshold, current); + } + } + } + /// Load current counter state for given method, following the provided memory ordering constrains pub fn load(&self, method: Method, ordering: Ordering) -> u64 { match method { @@ -97,7 +149,16 @@ impl RequestCounters { /// /// Returns the previously stored value. pub fn add_upload_traffic(&self, count: u64, ordering: Ordering) -> u64 { - self.upload.0.fetch_add(count, ordering) + let prev = self.upload.0.fetch_add(count, ordering); + let threshold = self.upload_threshold.0.load(Ordering::Acquire); + let uploaded = prev + count; + if threshold > 0 && uploaded > threshold && prev <= threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback("uploaded", threshold, uploaded); + } + } + prev } /// Returns upload traffic count. @@ -109,13 +170,51 @@ impl RequestCounters { /// /// Returns the previously stored value. pub fn add_download_traffic(&self, count: u64, ordering: Ordering) -> u64 { - self.download.0.fetch_add(count, ordering) + let prev = self.download.0.fetch_add(count, ordering); + let threshold = self.upload_threshold.0.load(Ordering::Acquire); + let downloaded = prev + count; + if threshold > 0 && downloaded > threshold && prev <= threshold { + let guard = SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK.read().unwrap(); + if let Some(callback) = guard.as_ref() { + callback("downloaded", threshold, downloaded); + } + } + prev } /// Returns download traffic count. pub fn get_download_traffic(&self, ordering: Ordering) -> u64 { self.download.0.load(ordering) } + + /// Update the request threshold values. + pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) { + self.delete_threshold + .0 + .store(thresholds.s3_delete.unwrap_or(0), Ordering::Release); + self.get_threshold + .0 + .store(thresholds.s3_get.unwrap_or(0), Ordering::Release); + self.head_threshold + .0 + .store(thresholds.s3_head.unwrap_or(0), Ordering::Release); + self.post_threshold + .0 + .store(thresholds.s3_post.unwrap_or(0), Ordering::Release); + self.put_threshold + .0 + .store(thresholds.s3_put.unwrap_or(0), Ordering::Release); + let download = thresholds + .s3_download + .map(|human_byte| human_byte.as_u64()) + .unwrap_or(0); + self.download_threshold.0.store(download, Ordering::Release); + let upload = thresholds + .s3_upload + .map(|human_byte| human_byte.as_u64()) + .unwrap_or(0); + self.upload_threshold.0.store(upload, Ordering::Release); + } } /// Size of the padding to align the mmapped request counters to 4k default @@ -270,6 +369,22 @@ impl SharedRequestCounters { pub fn path_buf(&self) -> PathBuf { self.path.clone() } + + /// Update the callback executed when one of the set thresholds is exceeded + pub fn update_thresholds_exceeded_callback(callback: ThresholdExceededCallback) { + SHARED_COUNTER_THRESHOLD_EXCEEDED_CALLBACK + .write() + .unwrap() + .replace(callback); + } + + /// Update the request counter thresholds to given values. + pub fn update_thresholds(&self, thresholds: &RequestCounterThresholds) { + self.shared_memory + .data() + .counters + .update_thresholds(thresholds); + } } const FLUSH_THRESHOLD: Duration = Duration::from_secs(5); -- 2.47.3