From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 0A1861FF136 for ; Mon, 09 Feb 2026 10:15:23 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 17063171; Mon, 9 Feb 2026 10:15:59 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH v1 2/6] s3-client: add persistent shared request counters for client Date: Mon, 9 Feb 2026 10:15:18 +0100 Message-ID: <20260209091533.156902-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260209091533.156902-1-c.ebner@proxmox.com> References: <20260209091533.156902-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1770628460088 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: DMDYZK2BWAHLWEV7PJSZZDQSC6MBHTN5 X-Message-ID-Hash: DMDYZK2BWAHLWEV7PJSZZDQSC6MBHTN5 X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Implements atomic counters for api requests successfully send to the S3 API endpoint via the client, accounting for individual requests discriminating based on their method. Since multiple client instances might exists accessing the API concurrently, even in different processes, provide the atomic counters via shared memory mapping. This follows along the lines of the shared traffic limiter implementation. The counter mappings are conditionally constructed on client instantiation by caller give configuration options. Keep sequential ordering of the counters with the intent to use them for statistics, soft limits and/or notifications within Proxmox Backup Server. Signed-off-by: Christian Ebner --- proxmox-s3-client/Cargo.toml | 4 + proxmox-s3-client/debian/control | 2 + proxmox-s3-client/examples/s3_client.rs | 1 + proxmox-s3-client/src/client.rs | 48 +++++- proxmox-s3-client/src/lib.rs | 7 +- .../src/shared_request_counters.rs | 152 ++++++++++++++++++ 6 files changed, 212 insertions(+), 2 deletions(-) create mode 100644 proxmox-s3-client/src/shared_request_counters.rs diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml index a50fa715..1e31bca4 100644 --- a/proxmox-s3-client/Cargo.toml +++ b/proxmox-s3-client/Cargo.toml @@ -38,7 +38,9 @@ proxmox-base64 = { workspace = true, optional = true } proxmox-http = { workspace = true, features = [ "body", "client", "client-trait" ], optional = true } proxmox-human-byte.workspace = true proxmox-rate-limiter = { workspace = true, features = [ "rate-limiter", "shared-rate-limiter" ], optional = true } +proxmox-shared-memory = { workspace = true, optional = true } proxmox-schema = { workspace = true, features = [ "api-macro", "api-types" ] } +proxmox-sys = { workspace = true, optional = true } proxmox-serde.workspace = true proxmox-time = {workspace = true, optional = true } @@ -65,6 +67,8 @@ impl = [ "dep:proxmox-base64", "dep:proxmox-http", "dep:proxmox-rate-limiter", + "dep:proxmox-shared-memory", + "dep:proxmox-sys", "dep:proxmox-time", ] diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control index 33418881..a534a107 100644 --- a/proxmox-s3-client/debian/control +++ b/proxmox-s3-client/debian/control @@ -85,6 +85,8 @@ Depends: librust-proxmox-rate-limiter-1+default-dev, librust-proxmox-rate-limiter-1+rate-limiter-dev, librust-proxmox-rate-limiter-1+shared-rate-limiter-dev, + librust-proxmox-shared-memory-1+default-dev, + librust-proxmox-sys-1+default-dev, librust-proxmox-time-2+default-dev (>= 2.1.0-~~), librust-quick-xml-0.36+async-tokio-dev (>= 0.36.1-~~), librust-quick-xml-0.36+default-dev (>= 0.36.1-~~), diff --git a/proxmox-s3-client/examples/s3_client.rs b/proxmox-s3-client/examples/s3_client.rs index ca69971c..329de47a 100644 --- a/proxmox-s3-client/examples/s3_client.rs +++ b/proxmox-s3-client/examples/s3_client.rs @@ -40,6 +40,7 @@ async fn run() -> Result<(), anyhow::Error> { put_rate_limit: None, provider_quirks: Vec::new(), rate_limiter_config: None, + request_counter_config: None, }; // Creating a client instance and connect to api endpoint diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs index 83176b39..4a6a702b 100644 --- a/proxmox-s3-client/src/client.rs +++ b/proxmox-s3-client/src/client.rs @@ -1,5 +1,6 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -32,6 +33,7 @@ use crate::response_reader::{ CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, ListBucketsResponse, ListObjectsV2Response, PutObjectResponse, ResponseReader, }; +use crate::shared_request_counters::SharedRequestCounters; /// Default timeout for s3 api requests. pub const S3_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30 * 60); @@ -74,6 +76,21 @@ pub struct S3RateLimiterConfig { burst_out: Option, } +/// Options for the s3 client's shared request counters +pub struct S3RequestCounterOptions { + /// ID for the memory mapped file + pub id: String, + /// Base path for the shared memory mapped file + pub base_path: PathBuf, + /// User for the to be created shared memory mapped file and folders + pub user: User, +} + +/// Configuration for the s3 client's shared request counters +pub struct S3RequestCounterConfig { + options: S3RequestCounterOptions, +} + /// Configuration options for client pub struct S3ClientOptions { /// Endpoint to access S3 object store. @@ -100,6 +117,8 @@ pub struct S3ClientOptions { pub provider_quirks: Vec, /// Configuration options for the shared rate limiter. pub rate_limiter_config: Option, + /// Configuration options for the client's shared request counters. + pub request_counter_config: Option, } impl S3ClientOptions { @@ -110,6 +129,7 @@ impl S3ClientOptions { bucket: Option, common_prefix: String, rate_limiter_options: Option, + request_counter_options: Option, ) -> Self { let rate_limiter_config = rate_limiter_options.map(|options| S3RateLimiterConfig { options, @@ -118,6 +138,8 @@ impl S3ClientOptions { rate_out: config.rate_out.map(|human_bytes| human_bytes.as_u64()), burst_out: config.burst_out.map(|human_bytes| human_bytes.as_u64()), }); + let request_counter_config = + request_counter_options.map(|options| S3RequestCounterConfig { options }); Self { endpoint: config.endpoint, port: config.port, @@ -131,6 +153,7 @@ impl S3ClientOptions { put_rate_limit: config.put_rate_limit, provider_quirks: config.provider_quirks.unwrap_or_default(), rate_limiter_config, + request_counter_config, } } } @@ -141,6 +164,7 @@ pub struct S3Client { options: S3ClientOptions, authority: Authority, put_rate_limiter: Option>>, + request_counters: Option>, } impl S3Client { @@ -213,6 +237,21 @@ impl S3Client { } } + let request_counters = if let Some(config) = options.request_counter_config.as_ref() { + let path = config + .options + .base_path + .join(format!("{}.shmem", config.options.id)); + let request_counters = SharedRequestCounters::open_shared_memory_mapped( + &path, + config.options.user.clone(), + ) + .context("failed to mmap shared S3 request counters")?; + Some(Arc::new(request_counters)) + } else { + None + }; + let client = Client::builder(TokioExecutor::new()).build::<_, Body>(https_connector); let authority_template = if let Some(port) = options.port { @@ -241,6 +280,7 @@ impl S3Client { options, authority, put_rate_limiter, + request_counters, }) } @@ -392,7 +432,13 @@ impl S3Client { }; match response { - Ok(Ok(response)) => return Ok(response), + Ok(Ok(response)) => { + if let Some(counters) = self.request_counters.as_ref() { + let _prev = counters.increment(parts.method.clone(), Ordering::SeqCst); + } + + return Ok(response); + } Ok(Err(err)) => { if retry >= MAX_S3_HTTP_REQUEST_RETRY - 1 { return Err(err.into()); diff --git a/proxmox-s3-client/src/lib.rs b/proxmox-s3-client/src/lib.rs index d02fd0dc..ceee41a2 100644 --- a/proxmox-s3-client/src/lib.rs +++ b/proxmox-s3-client/src/lib.rs @@ -21,7 +21,8 @@ pub use aws_sign_v4::uri_decode; mod client; #[cfg(feature = "impl")] pub use client::{ - S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3_HTTP_REQUEST_TIMEOUT, + S3Client, S3ClientOptions, S3PathPrefix, S3RateLimiterOptions, S3RequestCounterOptions, + S3_HTTP_REQUEST_TIMEOUT, }; #[cfg(feature = "impl")] mod timestamps; @@ -33,3 +34,7 @@ mod object_key; pub use object_key::S3ObjectKey; #[cfg(feature = "impl")] mod response_reader; +#[cfg(feature = "impl")] +mod shared_request_counters; +#[cfg(feature = "impl")] +pub use shared_request_counters::SharedRequestCounters; diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs new file mode 100644 index 00000000..b236490b --- /dev/null +++ b/proxmox-s3-client/src/shared_request_counters.rs @@ -0,0 +1,152 @@ +use std::mem::MaybeUninit; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; + +use anyhow::{bail, Error}; +use hyper::http::method::Method; +use nix::sys::stat::Mode; +use nix::unistd::User; + +use proxmox_shared_memory::{Init, SharedMemory}; +use proxmox_sys::fs::CreateOptions; + +const MEMORY_PAGE_SIZE: usize = 4096; +/// Generated via openssl::sha::sha256(b"Proxmox shared request counters v1.0")[0..8] +const PROXMOX_SHARED_REQUEST_COUNTERS_1_0: [u8; 8] = [224, 110, 88, 252, 26, 77, 180, 5]; + +#[repr(C)] +#[derive(Default)] +struct RequestCounters { + delete: AtomicU64, + get: AtomicU64, + head: AtomicU64, + post: AtomicU64, + put: AtomicU64, +} + +impl Init for RequestCounters { + fn initialize(this: &mut MaybeUninit) { + unsafe { + let this = &mut *this.as_mut_ptr(); + *this = RequestCounters::default(); + } + } +} + +impl RequestCounters { + /// Increment the counter for given method, following the provided memory ordering constrains. + /// + /// Returns the previously stored value. + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { + match method { + Method::DELETE => self.delete.fetch_add(1, ordering), + Method::GET => self.get.fetch_add(1, ordering), + Method::HEAD => self.head.fetch_add(1, ordering), + Method::POST => self.post.fetch_add(1, ordering), + Method::PUT => self.put.fetch_add(1, ordering), + _ => 0, + } + } + + /// Load current counter state for given method, following the provided memory ordering constrains + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { + match method { + Method::DELETE => self.delete.load(ordering), + Method::GET => self.get.load(ordering), + Method::HEAD => self.head.load(ordering), + Method::POST => self.post.load(ordering), + Method::PUT => self.put.load(ordering), + _ => 0, + } + } + + /// Reset all counters, following the provided memory ordering constrains + pub fn reset(&self, ordering: Ordering) { + self.delete.store(0, ordering); + self.get.store(0, ordering); + self.head.store(0, ordering); + self.post.store(0, ordering); + self.put.store(0, ordering); + } +} + +#[repr(C)] +struct MappableRequestCounters { + magic: [u8; 8], + counters: RequestCounters, + _page_size_padding: [u8; MEMORY_PAGE_SIZE + - PROXMOX_SHARED_REQUEST_COUNTERS_1_0.len() + - std::mem::size_of::()], +} + +impl Init for MappableRequestCounters { + fn initialize(this: &mut MaybeUninit) { + unsafe { + let this = &mut *this.as_mut_ptr(); + this.magic = PROXMOX_SHARED_REQUEST_COUNTERS_1_0; + this.counters = RequestCounters::default(); + } + } + + fn check_type_magic(this: &MaybeUninit) -> Result<(), Error> { + unsafe { + let this = &*this.as_ptr(); + if this.magic != PROXMOX_SHARED_REQUEST_COUNTERS_1_0 { + bail!("incorrect magic number for request counters detected"); + } + proxmox_shared_memory::check_subtype(&this.counters)?; + } + Ok(()) + } +} + +/// Atomic counters storing per-request method counts for the client. +/// +/// If set, the counts can be filtered based on a path prefix. +pub struct SharedRequestCounters { + shared_memory: SharedMemory, +} + +impl SharedRequestCounters { + /// Create a new shared counter instance. + /// + /// Opens or creates mmap file and accesses it via shared memory mapping. + pub fn open_shared_memory_mapped>(path: P, user: User) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + let dir_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o770)) + .owner(user.uid) + .group(user.gid); + + proxmox_sys::fs::create_path(parent, Some(dir_opts), Some(dir_opts))?; + } + + let file_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o660)) + .owner(user.uid) + .group(user.gid); + let shared_memory = SharedMemory::open(path, file_opts)?; + Ok(Self { shared_memory }) + } + + /// Increment the counter for given method, following the provided memory ordering constrains + /// + /// Returns the previously stored value. + pub fn increment(&self, method: Method, ordering: Ordering) -> u64 { + self.shared_memory + .data() + .counters + .increment(method, ordering) + } + + /// Load current counter state for given method, following the provided memory ordering constrains + pub fn load(&self, method: Method, ordering: Ordering) -> u64 { + self.shared_memory.data().counters.load(method, ordering) + } + + /// Reset all counters, following the provided memory ordering constrains + pub fn reset(&self, ordering: Ordering) { + self.shared_memory.data().counters.reset(ordering) + } +} -- 2.47.3