From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D7F9A1FF13B for ; Wed, 11 Feb 2026 13:13:32 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C49511AFC3; Wed, 11 Feb 2026 13:14:17 +0100 (CET) Message-ID: <2a88c886-9146-4448-9c5a-d27097f48eeb@proxmox.com> Date: Wed, 11 Feb 2026 13:13:41 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [PATCH v1 2/6] s3-client: add persistent shared request counters for client To: Christian Ebner , pbs-devel@lists.proxmox.com References: <20260209091533.156902-1-c.ebner@proxmox.com> <20260209091533.156902-3-c.ebner@proxmox.com> Content-Language: en-US, de-AT From: Robert Obkircher In-Reply-To: <20260209091533.156902-3-c.ebner@proxmox.com> Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770811935932 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.053 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: YLHGI35OCZGHJWZBTZJ6SRS4WHSV3JMT X-Message-ID-Hash: YLHGI35OCZGHJWZBTZJ6SRS4WHSV3JMT X-MailFrom: r.obkircher@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On 2/9/26 10:14, Christian Ebner wrote: > Implements atomic counters for api requests successfully send to the > S3 API endpoint via the client, accounting for individual requests > discriminating based on their method. > > Since multiple client instances might exists accessing the API > concurrently, even in different processes, provide the atomic > counters via shared memory mapping. This follows along the lines of > the shared traffic limiter implementation. > > The counter mappings are conditionally constructed on client > instantiation by caller give configuration options. > > Keep sequential ordering of the counters with the intent to use them > for statistics, soft limits and/or notifications within Proxmox > Backup Server. > > Signed-off-by: Christian Ebner > --- > proxmox-s3-client/Cargo.toml | 4 + > proxmox-s3-client/debian/control | 2 + > proxmox-s3-client/examples/s3_client.rs | 1 + > proxmox-s3-client/src/client.rs | 48 +++++- > proxmox-s3-client/src/lib.rs | 7 +- > .../src/shared_request_counters.rs | 152 ++++++++++++++++++ > 6 files changed, 212 insertions(+), 2 deletions(-) > create mode 100644 proxmox-s3-client/src/shared_request_counters.rs > > diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml > index a50fa715..1e31bca4 100644 > --- a/proxmox-s3-client/Cargo.toml > +++ b/proxmox-s3-client/Cargo.toml > @@ -38,7 +38,9 @@ proxmox-base64 = { workspace = true, optional = true } > proxmox-http = { workspace = true, features = [ "body", "client", "client-trait" ], optional = true } > proxmox-human-byte.workspace = true > proxmox-rate-limiter = { workspace = true, features = [ "rate-limiter", "shared-rate-limiter" ], optional = true } > +proxmox-shared-memory = { workspace = true, optional = true } > proxmox-schema = { workspace = true, features = [ "api-macro", "api-types" ] } > +proxmox-sys = { workspace = true, optional = true } > proxmox-serde.workspace = true > proxmox-time = {workspace = true, optional = true } > > @@ -65,6 +67,8 @@ impl = [ > "dep:proxmox-base64", > "dep:proxmox-http", > "dep:proxmox-rate-limiter", > + "dep:proxmox-shared-memory", > + "dep:proxmox-sys", > "dep:proxmox-time", > ] > > diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control > index 33418881..a534a107 100644 > --- a/proxmox-s3-client/debian/control > +++ b/proxmox-s3-client/debian/control > @@ -85,6 +85,8 @@ Depends: > librust-proxmox-rate-limiter-1+default-dev, > librust-proxmox-rate-limiter-1+rate-limiter-dev, > librust-proxmox-rate-limiter-1+shared-rate-limiter-dev, > + librust-proxmox-shared-memory-1+default-dev, > + librust-proxmox-sys-1+default-dev, > librust-proxmox-time-2+default-dev (>= 2.1.0-~~), > librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~), > librust-quick-xml-0.36+default-dev (>= 0.36.1-~~), > diff --git a/proxmox-s3-client/examples/s3_client.rs b/proxmox-s3-client/examples/s3_client.rs > index ca69971c..329de47a 100644 > --- a/proxmox-s3-client/examples/s3_client.rs > +++ b/proxmox-s3-client/examples/s3_client.rs > @@ -40,6 +40,7 @@ async fn run() -> Result<(), anyhow::Error> { > put_rate_limit: None, > provider_quirks: Vec::new(), > rate_limiter_config: None, > + request_counter_config: None, > }; > > // Creating a client instance and connect to api endpoint > diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs > index 83176b39..4a6a702b 100644 > --- a/proxmox-s3-client/src/client.rs > +++ b/proxmox-s3-client/src/client.rs > @@ -1,5 +1,6 @@ > use std::path::{Path, PathBuf}; > use std::str::FromStr; > +use std::sync::atomic::Ordering; > use std::sync::{Arc, Mutex}; > use std::time::{Duration, Instant}; > > @@ -32,6 +33,7 @@ use crate::response_reader::{ > CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, > ListBucketsResponse, ListObjectsV2Response, PutObjectResponse, ResponseReader, > }; > +use crate::shared_request_counters::SharedRequestCounters; > > /// Default timeout for s3 api requests. > pub const S3_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30 * 60); > @@ -74,6 +76,21 @@ pub struct S3RateLimiterConfig { > burst_out: Option, > } > > +/// Options for the s3 client's shared request counters > +pub struct S3RequestCounterOptions { > + /// ID for the memory mapped file > + pub id: String, > + /// Base path for the shared memory mapped file > + pub base_path: PathBuf, > + /// User for the to be created shared memory mapped file and folders > + pub user: User, > +} > + > +/// Configuration for the s3 client's shared request counters > +pub struct S3RequestCounterConfig { > + options: S3RequestCounterOptions, > +} > + > /// Configuration options for client > pub struct S3ClientOptions { > /// Endpoint to access S3 object store. > @@ -100,6 +117,8 @@ pub struct S3ClientOptions { > pub provider_quirks: Vec, > /// Configuration options for the shared rate limiter. > pub rate_limiter_config: Option, > + /// Configuration options for the client's shared request counters. > + pub request_counter_config: Option, > } > > impl S3ClientOptions { > @@ -110,6 +129,7 @@ impl S3ClientOptions { > bucket: Option, > common_prefix: String, > rate_limiter_options: Option, > + request_counter_options: Option, > ) -> Self { > let rate_limiter_config = rate_limiter_options.map(|options| S3RateLimiterConfig { > options, > @@ -118,6 +138,8 @@ impl S3ClientOptions { > rate_out: config.rate_out.map(|human_bytes| human_bytes.as_u64()), > burst_out: config.burst_out.map(|human_bytes| human_bytes.as_u64()), > }); > + let request_counter_config = > + request_counter_options.map(|options| S3RequestCounterConfig { options }); > Self { > endpoint: config.endpoint, > port: config.port, > @@ -131,6 +153,7 @@ impl S3ClientOptions { > put_rate_limit: config.put_rate_limit, > provider_quirks: config.provider_quirks.unwrap_or_default(), > rate_limiter_config, > + request_counter_config, > } > } > } > @@ -141,6 +164,7 @@ pub struct S3Client { > options: S3ClientOptions, > authority: Authority, > put_rate_limiter: Option>>, > + request_counters: Option>, > } > > impl S3Client { > @@ -213,6 +237,21 @@ impl S3Client { > } > } > > + let request_counters = if let Some(config) = options.request_counter_config.as_ref() { > + let path = config > + .options > + .base_path > + .join(format!("{}.shmem", config.options.id)); > + let request_counters = SharedRequestCounters::open_shared_memory_mapped( > + &path, > + config.options.user.clone(), > + ) > + .context("failed to mmap shared S3 request counters")?; > + Some(Arc::new(request_counters)) > + } else { > + None > + }; > + > let client = Client::builder(TokioExecutor::new()).build::<_, Body>(https_connector); > > let authority_template = if let Some(port) = options.port { > @@ -241,6 +280,7 @@ impl S3Client { > options, > authority, > put_rate_limiter, > + request_counters, > }) > } > > @@ -392,7 +432,13 @@ impl S3Client { > }; > > match response { > - Ok(Ok(response)) => return Ok(response), > + Ok(Ok(response)) => { > + if let Some(counters) = self.request_counters.as_ref() { > + let _prev = counters.increment(parts.method.clone(), Ordering::SeqCst); > + } > + > + return Ok(response); > + } > Ok(Err(err)) => { > if retry >= MAX_S3_HTTP_REQUEST_RETRY - 1 { > return Err(err.into()); > diff --git a/proxmox-s3-client/src/lib.rs b/proxmox-s3-client/src/lib.rs > index d02fd0dc..ceee41a2 100644 > --- a/proxmox-s3-client/src/lib.rs > +++ b/proxmox-s3-client/src/lib.rs > @@ -21,7 +21,8 @@ pub use aws_sign_v4::uri_decode; > mod client; > #[cfg(feature = "impl")] > pub use client::{ > - S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3_HTTP_REQUEST_TIMEOUT, > + S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3RequestCounterOptions, > + S3_HTTP_REQUEST_TIMEOUT, > }; > #[cfg(feature = "impl")] > mod timestamps; > @@ -33,3 +34,7 @@ mod object_key; > pub use object_key::S3ObjectKey; > #[cfg(feature = "impl")] > mod response_reader; > +#[cfg(feature = "impl")] > +mod shared_request_counters; > +#[cfg(feature = "impl")] > +pub use shared_request_counters::SharedRequestCounters; > diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs > new file mode 100644 > index 00000000..b236490b > --- /dev/null > +++ b/proxmox-s3-client/src/shared_request_counters.rs > @@ -0,0 +1,152 @@ > +use std::mem::MaybeUninit; > +use std::path::Path; > +use std::sync::atomic::{AtomicU64, Ordering}; > + > +use anyhow::{bail, Error}; > +use hyper::http::method::Method; > +use nix::sys::stat::Mode; > +use nix::unistd::User; > + > +use proxmox_shared_memory::{Init, SharedMemory}; > +use proxmox_sys::fs::CreateOptions; > + > +const MEMORY_PAGE_SIZE: usize = 4096; > +/// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8] > +const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5]; > + > +#[repr(C)] > +#[derive(Default)] > +struct RequestCounters { > + delete: AtomicU64, > + get: AtomicU64, > + head: AtomicU64, > + post: AtomicU64, > + put: AtomicU64, It probably doesn't matter (especially not with SeqCst), but aligning each field to 64 bytes would avoid false sharing. > +} > + > +impl Init for RequestCounters { > + fn initialize(this: &mut MaybeUninit) { > + unsafe { > + let this = &mut *this.as_mut_ptr(); > + *this = RequestCounters::default(); If the memory is actually uninitialized this is undefined behavior because `*this =` drops the previous value. The memory from mmap is initialized, but I think it would still be better to use ptr::write. > + } > + } > +} > + > +impl RequestCounters { > + /// Increment the counter for given method, following the provided memory ordering constrains. > + /// > + /// Returns the previously stored value. > + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { > + match method { > + Method::DELETE => self.delete.fetch_add(1, ordering), > + Method::GET => self.get.fetch_add(1, ordering), > + Method::HEAD => self.head.fetch_add(1, ordering), > + Method::POST => self.post.fetch_add(1, ordering), > + Method::PUT => self.put.fetch_add(1, ordering), > + _ => 0, > + } > + } > + > + /// Load current counter state for given method, following the provided memory ordering constrains > + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { > + match method { > + Method::DELETE => self.delete.load(ordering), > + Method::GET => self.get.load(ordering), > + Method::HEAD => self.head.load(ordering), > + Method::POST => self.post.load(ordering), > + Method::PUT => self.put.load(ordering), > + _ => 0, > + } > + } > + > + /// Reset all counters, following the provided memory ordering constrains > + pub fn reset(&self, ordering: Ordering) { > + self.delete.store(0, ordering); > + self.get.store(0, ordering); > + self.head.store(0, ordering); > + self.post.store(0, ordering); > + self.put.store(0, ordering); > + } > +} > + > +#[repr(C)] > +struct MappableRequestCounters { > + magic: [u8; 8], > + counters: RequestCounters, > + _page_size_padding: [u8; MEMORY_PAGE_SIZE > + - PROXMOX_SHARED_REQUEST_COUNTERS_1_0.len() > + - std::mem::size_of::()], Why do we need to hard-code a possibly incorrect assumption about the page size? I know `SharedMemory` claims that the size needs to be a multiple of 4096, but I don't understand why. I guess this implicitly prevents unused padding at the end of the struct which would be problematic if we add a field and run two processes with different versions at the same time, but that doesn't explain the page size. > +} > + > +impl Init for MappableRequestCounters { > + fn initialize(this: &mut MaybeUninit) { > + unsafe { > + let this = &mut *this.as_mut_ptr(); > + this.magic = PROXMOX_SHARED_REQUEST_COUNTERS_1_0; > + this.counters = RequestCounters::default(); > + } > + } > + > + fn check_type_magic(this: &MaybeUninit) -> Result<(), Error> { > + unsafe { > + let this = &*this.as_ptr(); > + if this.magic != PROXMOX_SHARED_REQUEST_COUNTERS_1_0 { > + bail!("incorrect magic number for request counters detected"); > + } > + proxmox_shared_memory::check_subtype(&this.counters)?; > + } > + Ok(()) > + } > +} > + > +/// Atomic counters storing per-request method counts for the client. > +/// > +/// If set, the counts can be filtered based on a path prefix. > +pub struct SharedRequestCounters { > + shared_memory: SharedMemory, > +} > + > +impl SharedRequestCounters { > + /// Create a new shared counter instance. > + /// > + /// Opens or creates mmap file and accesses it via shared memory mapping. > + pub fn open_shared_memory_mapped>(path: P, user: User) -> Result { > + let path = path.as_ref(); > + if let Some(parent) = path.parent() { > + let dir_opts = CreateOptions::new() > + .perm(Mode::from_bits_truncate(0o770)) > + .owner(user.uid) > + .group(user.gid); > + > + proxmox_sys::fs::create_path(parent, Some(dir_opts), Some(dir_opts))?; > + } > + > + let file_opts = CreateOptions::new() > + .perm(Mode::from_bits_truncate(0o660)) > + .owner(user.uid) > + .group(user.gid); > + let shared_memory = SharedMemory::open(path, file_opts)?; > + Ok(Self { shared_memory }) > + } > + > + /// Increment the counter for given method, following the provided memory ordering constrains > + /// > + /// Returns the previously stored value. > + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { > + self.shared_memory > + .data() > + .counters > + .increment(method, ordering) > + } > + > + /// Load current counter state for given method, following the provided memory ordering constrains > + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { > + self.shared_memory.data().counters.load(method, ordering) > + } > + > + /// Reset all counters, following the provided memory ordering constrains > + pub fn reset(&self, ordering: Ordering) { > + self.shared_memory.data().counters.reset(ordering) > + } > +}