From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id EFA1E1FF139 for ; Tue, 24 Feb 2026 10:14:22 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 120BB1FED2; Tue, 24 Feb 2026 10:15:08 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox v3 08/10] s3-client: request counters: periodically persist counters to file Date: Tue, 24 Feb 2026 10:13:52 +0100 Message-ID: <20260224091406.169080-9-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260224091406.169080-1-c.ebner@proxmox.com> References: <20260224091406.169080-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1771924448278 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.049 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: S3SGEX3RHMZPKMWLXUORONSPL3RNJTSI X-Message-ID-Hash: S3SGEX3RHMZPKMWLXUORONSPL3RNJTSI X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: According to the mmap man page [0] shared memory mapped contents are not guaranteed to be written back to the underlying file until unmapped, unless the MAP_SYNC flag is used. This would however lead to excessive I/O, writing counter states on each and every update. Therefore, periodically persist the counter state via asynchronous msync calls. To reduce the number of such requests, do not perform this unconditionally and for each mmapped counter individually, but rather provide a global flusher which only does the flush if the s3 client registered a callback function. Further, do not perform the call for each counter update, but rather let the s3 client signal when counter updates happened via an async channel. The task for flush request processing is instantiated and executed on demand once callbacks are registered. [0] https://man7.org/linux/man-pages/man2/mmap.2.html Signed-off-by: Christian Ebner --- proxmox-s3-client/Cargo.toml | 2 + proxmox-s3-client/debian/control | 1 + proxmox-s3-client/src/client.rs | 20 ++- .../src/shared_request_counters.rs | 167 +++++++++++++++++- 4 files changed, 183 insertions(+), 7 deletions(-) diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml index 1e31bca4..6b8aea49 100644 --- a/proxmox-s3-client/Cargo.toml +++ b/proxmox-s3-client/Cargo.toml @@ -34,6 +34,7 @@ tokio-util = { workspace = true, features = [ "compat" ], optional = true } tracing = { workspace = true, optional = true } url = {workspace = true, optional = true } +proxmox-async = { workspace = true, optional = true } proxmox-base64 = { workspace = true, optional = true } proxmox-http = { workspace = true, features = [ "body", "client", "client-trait" ], optional = true } proxmox-human-byte.workspace = true @@ -64,6 +65,7 @@ impl = [ "dep:tokio-util", "dep:tracing", "dep:url", + "dep:proxmox-async", "dep:proxmox-base64", "dep:proxmox-http", "dep:proxmox-rate-limiter", diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control index a534a107..bf8e37d6 100644 --- a/proxmox-s3-client/debian/control +++ b/proxmox-s3-client/debian/control @@ -77,6 +77,7 @@ Depends: librust-md5-0.7+default-dev, librust-nix-0.29+default-dev, librust-openssl-0.10+default-dev, + librust-proxmox-async-0.5+default-dev, librust-proxmox-base64-1+default-dev, librust-proxmox-http-1+body-dev (>= 1.0.5-~~), librust-proxmox-http-1+client-dev (>= 1.0.5-~~), diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs index 1c6b9c33..29ddc1de 100644 --- a/proxmox-s3-client/src/client.rs +++ b/proxmox-s3-client/src/client.rs @@ -1,7 +1,7 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::Ordering; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, LazyLock, Mutex, RwLock}; use std::time::{Duration, Instant}; use anyhow::{bail, format_err, Context, Error}; @@ -33,7 +33,7 @@ use crate::response_reader::{ CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse, ListBucketsResponse, ListObjectsV2Response, PutObjectResponse, ResponseReader, }; -use crate::shared_request_counters::SharedRequestCounters; +use crate::shared_request_counters::{MmapFlusher, SharedRequestCounters}; /// Default timeout for s3 api requests. pub const S3_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30 * 60); @@ -46,6 +46,9 @@ const S3_MIN_ASSUMED_UPLOAD_RATE: u64 = 1024; const MAX_S3_HTTP_REQUEST_RETRY: usize = 3; const S3_HTTP_REQUEST_RETRY_BACKOFF_DEFAULT: Duration = Duration::from_secs(1); +static SHARED_COUNTER_FLUSHER: LazyLock> = + LazyLock::new(|| RwLock::new(MmapFlusher::new())); + /// S3 object key path prefix without the context prefix as defined by the client options. /// /// The client option's context prefix will be prepended by the various client methods before @@ -247,7 +250,14 @@ impl S3Client { config.options.user.clone(), ) .context("failed to mmap shared S3 request counters")?; - Some(Arc::new(request_counters)) + let request_counters = Arc::new(request_counters); + + SHARED_COUNTER_FLUSHER + .write() + .unwrap() + .register_counter(Arc::clone(&request_counters)); + + Some(request_counters) } else { None }; @@ -441,6 +451,10 @@ impl S3Client { .context("failed to account for upload traffic")?; let _prev_uploaded = counters.add_upload_traffic(transferred, Ordering::AcqRel); + + tokio::task::spawn_blocking(|| { + let _ = SHARED_COUNTER_FLUSHER.read().unwrap().request_flush(); + }); } return Ok(response); diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs index a5cd286c..d3f53c8e 100644 --- a/proxmox-s3-client/src/shared_request_counters.rs +++ b/proxmox-s3-client/src/shared_request_counters.rs @@ -1,11 +1,19 @@ +use std::collections::HashMap; use std::mem::MaybeUninit; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; use anyhow::{bail, Error}; use hyper::http::method::Method; +use nix::sys::mman::MsFlags; use nix::sys::stat::Mode; use nix::unistd::User; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio::task::JoinHandle; +use tokio::time::Instant; use proxmox_shared_memory::{Init, SharedMemory}; use proxmox_sys::fs::CreateOptions; @@ -163,6 +171,7 @@ impl Init for MappableRequestCounters { /// If set, the counts can be filtered based on a path prefix. pub struct SharedRequestCounters { shared_memory: SharedMemory, + path: PathBuf, } impl SharedRequestCounters { @@ -170,7 +179,7 @@ impl SharedRequestCounters { /// /// Opens or creates mmap file and accesses it via shared memory mapping. pub fn open_shared_memory_mapped>(path: P, user: User) -> Result { - let path = path.as_ref(); + let path = path.as_ref().to_path_buf(); if let Some(parent) = path.parent() { let dir_opts = CreateOptions::new() .perm(Mode::from_bits_truncate(0o770)) @@ -184,8 +193,11 @@ impl SharedRequestCounters { .perm(Mode::from_bits_truncate(0o660)) .owner(user.uid) .group(user.gid); - let shared_memory = SharedMemory::open_non_tmpfs(path, file_opts)?; - Ok(Self { shared_memory }) + let shared_memory = SharedMemory::open_non_tmpfs(&path, file_opts)?; + Ok(Self { + shared_memory, + path, + }) } /// Increment the counter for given method, following the provided memory ordering constrains @@ -243,4 +255,151 @@ impl SharedRequestCounters { .counters .get_download_traffic(ordering) } + + /// Flush in-memory contents to backing file, but do not wait for completion + pub fn schedule_flush(&self) -> Result<(), Error> { + self.shared_memory.msync(MsFlags::MS_ASYNC) + } + + /// Path of shared memory backing file + pub fn path_buf(&self) -> PathBuf { + self.path.clone() + } +} + +const FLUSH_THRESHOLD: Duration = Duration::from_secs(5); + +// state for periodic flushing of the mmapped request counter values to the +// backend +pub(crate) struct MmapFlusher { + task_handler: Option, + register: Arc>>, +} + +struct CounterRegisterItem { + register_count: usize, + counters: Arc, +} + +struct TaskHandler { + request_sender: mpsc::Sender<()>, + task_handle: JoinHandle<()>, + // Keep reference to runtime while task is being executed + _runtime: Arc, +} + +impl Drop for TaskHandler { + fn drop(&mut self) { + self.task_handle.abort(); + } +} + +impl MmapFlusher { + /// Create new empty and inactive flusher instance. Handler task will be created on-demand + /// when the first counter is registered. + pub(crate) fn new() -> Self { + Self { + task_handler: None, + register: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Register the shared request counter to be flushed periodically. + pub(crate) fn register_counter(&mut self, counters: Arc) { + let id = counters.path_buf(); + + if self.task_handler.is_none() { + self.task_handler = Some(self.init_channel_and_task()); + } + + let mut register = self.register.write().unwrap(); + register + .entry(id) + .and_modify(|item| item.register_count += 1) + .or_insert(CounterRegisterItem { + register_count: 1, + counters, + }); + } + + /// Remove the shared request counter to no longer be flushed by the handler task. + pub(crate) fn remove_counter(&mut self, id: &PathBuf) { + let mut register = self.register.write().unwrap(); + if let Some(item) = register.remove(id) { + if item.register_count > 1 { + register.insert( + item.counters.path_buf(), + CounterRegisterItem { + register_count: item.register_count - 1, + counters: item.counters, + }, + ); + } + } + if register.is_empty() { + // no more registered counters, abort task by dropping + self.task_handler.take(); + } + } + + /// Request for the flusher to be executed the next time the timeout is reached. + pub(crate) fn request_flush(&self) -> Result<(), Error> { + match self.task_handler.as_ref() { + Some(handler) => { + // ignore when channel full, flush already requested anyways + if let Err(TrySendError::Closed(())) = handler.request_sender.try_send(()) { + bail!("failed to send flush request, channel closed"); + } + } + None => bail!("failed to send flush request, no task handler"), + } + Ok(()) + } + + /// Setup or get the current tokio runtime, create channel for requesting flushes and setup + /// the task to periodically check for flush requests. + fn init_channel_and_task(&self) -> TaskHandler { + let (request_sender, mut request_receiver) = mpsc::channel(1); + + let register = Arc::clone(&self.register); + let _runtime = proxmox_async::runtime::get_runtime(); + let task_handle = _runtime.spawn(async move { + let mut flush_requested = false; + let mut next_timeout = Instant::now() + FLUSH_THRESHOLD; + + loop { + match tokio::time::timeout_at(next_timeout, request_receiver.recv()).await { + Ok(Some(())) => flush_requested = true, + Err(_timeout) => { + if flush_requested { + Self::handle_flush(Arc::clone(®ister)); + flush_requested = false; + } + next_timeout = Instant::now() + FLUSH_THRESHOLD; + } + _ => { + // channel closed or error + Self::handle_flush(Arc::clone(®ister)); + return; + } + } + } + }); + + TaskHandler { + request_sender, + task_handle, + _runtime, + } + } + + // Helper to flush all currently registered shared request counters. + fn handle_flush(register: Arc>>) { + let register = register.read().unwrap(); + for item in register.values() { + if let Err(err) = item.counters.schedule_flush() { + tracing::error!("failed to schedule flush: {err}"); + } + } + } } -- 2.47.3