From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6BBE31FF142 for ; Mon, 16 Feb 2026 13:14:16 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B342610EF3; Mon, 16 Feb 2026 13:15:03 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox v2 2/6] s3-client: add persistent shared request counters for client Date: Mon, 16 Feb 2026 13:13:50 +0100 Message-ID: <20260216121406.99617-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260216121406.99617-1-c.ebner@proxmox.com> References: <20260216121406.99617-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1771244054283 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: WXEDJ5H4R5YH7ZRO3I74CX6YZRCK3OFN X-Message-ID-Hash: WXEDJ5H4R5YH7ZRO3I74CX6YZRCK3OFN X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Implements atomic counters for api requests successfully send to the S3 API endpoint via the client, accounting for individual requests discriminating based on their method. The counter mappings are conditionally constructed on client instantiation by caller give configuration options. Since multiple client instances might exist, accessing the API concurrently and possibly from different processes, provide the atomic counters via shared memory mapping. This follows along the lines of the shared traffic limiter implementation. To reduce cache line contention, atomic counters are aligned to half the standard cache line size of 64-bytes. Signed-off-by: Christian Ebner --- changes since version 1: - relax ordering requirement from SeqCst to RelAcq/Release/Acquire. - reorder and align counters to half cache line size to reduce cache line contention. - rework init logic to avoid undefined behaviour. - adapt page size padding calculation to newly aligned counters. proxmox-s3-client/Cargo.toml | 4 + proxmox-s3-client/debian/control | 2 + proxmox-s3-client/examples/s3_client.rs | 1 + proxmox-s3-client/src/client.rs | 48 ++++- proxmox-s3-client/src/lib.rs | 7 +- .../src/shared_request_counters.rs | 184 ++++++++++++++++++ 6 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 proxmox-s3-client/src/shared_request_counters.rs diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml index a50fa715..1e31bca4 100644 --- a/proxmox-s3-client/Cargo.toml +++ b/proxmox-s3-client/Cargo.toml @@ -38,7 +38,9 @@ proxmox-base64 = { workspace = true, optional = true } proxmox-http = { workspace = true, features = [ "body", "client", "client-trait" ], optional = true } proxmox-human-byte.workspace = true proxmox-rate-limiter = { workspace = true, features = [ "rate-limiter", "shared-rate-limiter" ], optional = true } +proxmox-shared-memory = { workspace = true, optional = true } proxmox-schema = { workspace = true, features = [ "api-macro", "api-types" ] } +proxmox-sys = { workspace = true, optional = true } proxmox-serde.workspace = true proxmox-time = {workspace = true, optional = true } @@ -65,6 +67,8 @@ impl = [ "dep:proxmox-base64", "dep:proxmox-http", "dep:proxmox-rate-limiter", + "dep:proxmox-shared-memory", + "dep:proxmox-sys", "dep:proxmox-time", ] diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control index 33418881..a534a107 100644 --- a/proxmox-s3-client/debian/control +++ b/proxmox-s3-client/debian/control @@ -85,6 +85,8 @@ Depends: librust-proxmox-rate-limiter-1+default-dev, librust-proxmox-rate-limiter-1+rate-limiter-dev, librust-proxmox-rate-limiter-1+shared-rate-limiter-dev, + librust-proxmox-shared-memory-1+default-dev, + librust-proxmox-sys-1+default-dev, librust-proxmox-time-2+default-dev (>= 2.1.0-~~), librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~), librust-quick-xml-0.36+default-dev (>= 0.36.1-~~), diff --git a/proxmox-s3-client/examples/s3_client.rs b/proxmox-s3-client/examples/s3_client.rs index ca69971c..329de47a 100644 --- a/proxmox-s3-client/examples/s3_client.rs +++ b/proxmox-s3-client/examples/s3_client.rs @@ -40,6 +40,7 @@ async fn run() -> Result<(), anyhow::Error> { put_rate_limit: None, provider_quirks: Vec::new(), rate_limiter_config: None, + request_counter_config: None, }; // Creating a client instance and connect to api endpoint diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs index 83176b39..731367cb 100644 --- a/proxmox-s3-client/src/client.rs +++ b/proxmox-s3-client/src/client.rs @@ -1,5 +1,6 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -32,6 +33,7 @@ use crate::response_reader::{ CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, ListBucketsResponse, ListObjectsV2Response, PutObjectResponse, ResponseReader, }; +use crate::shared_request_counters::SharedRequestCounters; /// Default timeout for s3 api requests. pub const S3_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30 * 60); @@ -74,6 +76,21 @@ pub struct S3RateLimiterConfig { burst_out: Option, } +/// Options for the s3 client's shared request counters +pub struct S3RequestCounterOptions { + /// ID for the memory mapped file + pub id: String, + /// Base path for the shared memory mapped file + pub base_path: PathBuf, + /// User for the to be created shared memory mapped file and folders + pub user: User, +} + +/// Configuration for the s3 client's shared request counters +pub struct S3RequestCounterConfig { + options: S3RequestCounterOptions, +} + /// Configuration options for client pub struct S3ClientOptions { /// Endpoint to access S3 object store. @@ -100,6 +117,8 @@ pub struct S3ClientOptions { pub provider_quirks: Vec, /// Configuration options for the shared rate limiter. pub rate_limiter_config: Option, + /// Configuration options for the client's shared request counters. + pub request_counter_config: Option, } impl S3ClientOptions { @@ -110,6 +129,7 @@ impl S3ClientOptions { bucket: Option, common_prefix: String, rate_limiter_options: Option, + request_counter_options: Option, ) -> Self { let rate_limiter_config = rate_limiter_options.map(|options| S3RateLimiterConfig { options, @@ -118,6 +138,8 @@ impl S3ClientOptions { rate_out: config.rate_out.map(|human_bytes| human_bytes.as_u64()), burst_out: config.burst_out.map(|human_bytes| human_bytes.as_u64()), }); + let request_counter_config = + request_counter_options.map(|options| S3RequestCounterConfig { options }); Self { endpoint: config.endpoint, port: config.port, @@ -131,6 +153,7 @@ impl S3ClientOptions { put_rate_limit: config.put_rate_limit, provider_quirks: config.provider_quirks.unwrap_or_default(), rate_limiter_config, + request_counter_config, } } } @@ -141,6 +164,7 @@ pub struct S3Client { options: S3ClientOptions, authority: Authority, put_rate_limiter: Option>>, + request_counters: Option>, } impl S3Client { @@ -213,6 +237,21 @@ impl S3Client { } } + let request_counters = if let Some(config) = options.request_counter_config.as_ref() { + let path = config + .options + .base_path + .join(format!("{}.shmem", config.options.id)); + let request_counters = SharedRequestCounters::open_shared_memory_mapped( + &path, + config.options.user.clone(), + ) + .context("failed to mmap shared S3 request counters")?; + Some(Arc::new(request_counters)) + } else { + None + }; + let client = Client::builder(TokioExecutor::new()).build::<_, Body>(https_connector); let authority_template = if let Some(port) = options.port { @@ -241,6 +280,7 @@ impl S3Client { options, authority, put_rate_limiter, + request_counters, }) } @@ -392,7 +432,13 @@ impl S3Client { }; match response { - Ok(Ok(response)) => return Ok(response), + Ok(Ok(response)) => { + if let Some(counters) = self.request_counters.as_ref() { + let _prev = counters.increment(parts.method.clone(), Ordering::AcqRel); + } + + return Ok(response); + } Ok(Err(err)) => { if retry >= MAX_S3_HTTP_REQUEST_RETRY - 1 { return Err(err.into()); diff --git a/proxmox-s3-client/src/lib.rs b/proxmox-s3-client/src/lib.rs index d02fd0dc..ceee41a2 100644 --- a/proxmox-s3-client/src/lib.rs +++ b/proxmox-s3-client/src/lib.rs @@ -21,7 +21,8 @@ pub use aws_sign_v4::uri_decode; mod client; #[cfg(feature = "impl")] pub use client::{ - S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3_HTTP_REQUEST_TIMEOUT, + S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3RequestCounterOptions, + S3_HTTP_REQUEST_TIMEOUT, }; #[cfg(feature = "impl")] mod timestamps; @@ -33,3 +34,7 @@ mod object_key; pub use object_key::S3ObjectKey; #[cfg(feature = "impl")] mod response_reader; +#[cfg(feature = "impl")] +mod shared_request_counters; +#[cfg(feature = "impl")] +pub use shared_request_counters::SharedRequestCounters; diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs new file mode 100644 index 00000000..ec587a07 --- /dev/null +++ b/proxmox-s3-client/src/shared_request_counters.rs @@ -0,0 +1,184 @@ +use std::mem::MaybeUninit; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; + +use anyhow::{bail, Error}; +use hyper::http::method::Method; +use nix::sys::stat::Mode; +use nix::unistd::User; + +use proxmox_shared_memory::{Init, SharedMemory}; +use proxmox_sys::fs::CreateOptions; + +const MEMORY_PAGE_SIZE: usize = 4096; +/// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8] +const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5]; + +#[repr(C, align(32))] +#[derive(Default)] +/// AtomicU64 aligned to the half default cache line size of 64-bytes. +struct AlignedAtomic(AtomicU64); + +#[repr(C, align(32))] +#[derive(Default, PartialEq)] +/// Mmapped file magic number aligned to half the default cache line size of 64-bytes. +/// Facilitates the padding size calculation. +struct AlignedMagic([u8; 8]); + +#[repr(C)] +#[derive(Default)] +// Ordering is chosen to bundle frequently expected counter updates with less +// fequent ones. Ideally each counter would live in it's own cache line, but +// that requires double the memory. +struct RequestCounters { + // request count + get: AlignedAtomic, + delete: AlignedAtomic, + put: AlignedAtomic, + head: AlignedAtomic, + post: AlignedAtomic, +} + +impl Init for RequestCounters { + fn initialize(this: &mut MaybeUninit) { + // safety: RequestCounters contains simple data types with no internal references. + this.write(RequestCounters::default()); + } +} + +impl RequestCounters { + /// Increment the counter for given method, following the provided memory ordering constrains. + /// + /// Returns the previously stored value. + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { + match method { + Method::DELETE => self.delete.0.fetch_add(1, ordering), + Method::GET => self.get.0.fetch_add(1, ordering), + Method::HEAD => self.head.0.fetch_add(1, ordering), + Method::POST => self.post.0.fetch_add(1, ordering), + Method::PUT => self.put.0.fetch_add(1, ordering), + _ => 0, + } + } + + /// Load current counter state for given method, following the provided memory ordering constrains + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { + match method { + Method::DELETE => self.delete.0.load(ordering), + Method::GET => self.get.0.load(ordering), + Method::HEAD => self.head.0.load(ordering), + Method::POST => self.post.0.load(ordering), + Method::PUT => self.put.0.load(ordering), + _ => 0, + } + } + + /// Reset all counters, following the provided memory ordering constrains + pub fn reset(&self, ordering: Ordering) { + self.delete.0.store(0, ordering); + self.get.0.store(0, ordering); + self.head.0.store(0, ordering); + self.post.0.store(0, ordering); + self.put.0.store(0, ordering); + } +} + +/// Size of the padding to align the mmapped request counters to 4k default +/// page size. +const PADDING_SIZE: usize = MEMORY_PAGE_SIZE + - std::mem::size_of::() + - std::mem::size_of::(); + +#[repr(C)] +// Alignment is chosen to reduce cache line contention while keeping low +// memory footprint. +struct MappableRequestCounters { + magic: AlignedMagic, + counters: RequestCounters, + _page_size_padding: [u8; PADDING_SIZE], +} + +impl Default for MappableRequestCounters { + fn default() -> Self { + Self { + magic: AlignedMagic(PROXMOX_SHARED_REQUEST_COUNTERS_1_0), + counters: RequestCounters::default(), + _page_size_padding: [0; PADDING_SIZE], + } + } +} + +impl Init for MappableRequestCounters { + fn initialize(this: &mut MaybeUninit) { + // safety: MappableRequestCounters contains simple data types with no internal references. + this.write(MappableRequestCounters::default()); + } + + fn check_type_magic(this: &MaybeUninit) -> Result<(), Error> { + unsafe { + // safety: do not make assumptions about the object being initialized, + // use raw pointer offsets to check memory for expected contents. + let this_ptr = this.as_ptr(); + + let magic_ptr = std::ptr::addr_of!((*this_ptr).magic); + if *magic_ptr != AlignedMagic(PROXMOX_SHARED_REQUEST_COUNTERS_1_0) { + bail!("incorrect magic number for request counters detected"); + } + + let counters_ptr = std::ptr::addr_of!((*this_ptr).counters); + proxmox_shared_memory::check_subtype(&*counters_ptr)?; + } + Ok(()) + } +} + +/// Atomic counters storing per-request method counts for the client. +/// +/// If set, the counts can be filtered based on a path prefix. +pub struct SharedRequestCounters { + shared_memory: SharedMemory, +} + +impl SharedRequestCounters { + /// Create a new shared counter instance. + /// + /// Opens or creates mmap file and accesses it via shared memory mapping. + pub fn open_shared_memory_mapped>(path: P, user: User) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + let dir_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o770)) + .owner(user.uid) + .group(user.gid); + + proxmox_sys::fs::create_path(parent, Some(dir_opts), Some(dir_opts))?; + } + + let file_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o660)) + .owner(user.uid) + .group(user.gid); + let shared_memory = SharedMemory::open_non_tmpfs(path, file_opts)?; + Ok(Self { shared_memory }) + } + + /// Increment the counter for given method, following the provided memory ordering constrains + /// + /// Returns the previously stored value. + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { + self.shared_memory + .data() + .counters + .increment(method, ordering) + } + + /// Load current counter state for given method, following the provided memory ordering constrains + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { + self.shared_memory.data().counters.load(method, ordering) + } + + /// Reset all counters, following the provided memory ordering constrains + pub fn reset(&self, ordering: Ordering) { + self.shared_memory.data().counters.reset(ordering) + } +} -- 2.47.3