From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox v3 08/10] s3-client: request counters: periodically persist counters to file
Date: Tue, 24 Feb 2026 10:13:52 +0100 [thread overview]
Message-ID: <20260224091406.169080-9-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260224091406.169080-1-c.ebner@proxmox.com>
According to the mmap man page [0] shared memory mapped contents are
not guaranteed to be written back to the underlying file until
unmapped, unless the MAP_SYNC flag is used. This would however lead
to excessive I/O, writing counter states on each and every update.
Therefore, periodically persist the counter state via asynchronous
msync calls. To reduce the number of such requests, do not perform
this unconditionally and for each mmapped counter individually, but
rather provide a global flusher which only does the flush if the
s3 client registered a callback function. Further, do not perform
the call for each counter update, but rather let the s3 client signal
when counter updates happened via an async channel.
The task for flush request processing is instantiated and executed on
demand once callbacks are registered.
[0] https://man7.org/linux/man-pages/man2/mmap.2.html
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-s3-client/Cargo.toml | 2 +
proxmox-s3-client/debian/control | 1 +
proxmox-s3-client/src/client.rs | 20 ++-
.../src/shared_request_counters.rs | 167 +++++++++++++++++-
4 files changed, 183 insertions(+), 7 deletions(-)
diff --git a/proxmox-s3-client/Cargo.toml b/proxmox-s3-client/Cargo.toml
index 1e31bca4..6b8aea49 100644
--- a/proxmox-s3-client/Cargo.toml
+++ b/proxmox-s3-client/Cargo.toml
@@ -34,6 +34,7 @@ tokio-util = { workspace = true, features = [ "compat" ], optional = true }
tracing = { workspace = true, optional = true }
url = {workspace = true, optional = true }
+proxmox-async = { workspace = true, optional = true }
proxmox-base64 = { workspace = true, optional = true }
proxmox-http = { workspace = true, features = [ "body", "client", "client-trait" ], optional = true }
proxmox-human-byte.workspace = true
@@ -64,6 +65,7 @@ impl = [
"dep:tokio-util",
"dep:tracing",
"dep:url",
+ "dep:proxmox-async",
"dep:proxmox-base64",
"dep:proxmox-http",
"dep:proxmox-rate-limiter",
diff --git a/proxmox-s3-client/debian/control b/proxmox-s3-client/debian/control
index a534a107..bf8e37d6 100644
--- a/proxmox-s3-client/debian/control
+++ b/proxmox-s3-client/debian/control
@@ -77,6 +77,7 @@ Depends:
librust-md5-0.7+default-dev,
librust-nix-0.29+default-dev,
librust-openssl-0.10+default-dev,
+ librust-proxmox-async-0.5+default-dev,
librust-proxmox-base64-1+default-dev,
librust-proxmox-http-1+body-dev (>= 1.0.5-~~),
librust-proxmox-http-1+client-dev (>= 1.0.5-~~),
diff --git a/proxmox-s3-client/src/client.rs b/proxmox-s3-client/src/client.rs
index 1c6b9c33..29ddc1de 100644
--- a/proxmox-s3-client/src/client.rs
+++ b/proxmox-s3-client/src/client.rs
@@ -1,7 +1,7 @@
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::Ordering;
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, LazyLock, Mutex, RwLock};
use std::time::{Duration, Instant};
use anyhow::{bail, format_err, Context, Error};
@@ -33,7 +33,7 @@ use crate::response_reader::{
CopyObjectResponse, DeleteObjectsResponse, GetObjectResponse, HeadObjectResponse,
ListBucketsResponse, ListObjectsV2Response, PutObjectResponse, ResponseReader,
};
-use crate::shared_request_counters::SharedRequestCounters;
+use crate::shared_request_counters::{MmapFlusher, SharedRequestCounters};
/// Default timeout for s3 api requests.
pub const S3_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30 * 60);
@@ -46,6 +46,9 @@ const S3_MIN_ASSUMED_UPLOAD_RATE: u64 = 1024;
const MAX_S3_HTTP_REQUEST_RETRY: usize = 3;
const S3_HTTP_REQUEST_RETRY_BACKOFF_DEFAULT: Duration = Duration::from_secs(1);
+static SHARED_COUNTER_FLUSHER: LazyLock<RwLock<MmapFlusher>> =
+ LazyLock::new(|| RwLock::new(MmapFlusher::new()));
+
/// S3 object key path prefix without the context prefix as defined by the client options.
///
/// The client option's context prefix will be prepended by the various client methods before
@@ -247,7 +250,14 @@ impl S3Client {
config.options.user.clone(),
)
.context("failed to mmap shared S3 request counters")?;
- Some(Arc::new(request_counters))
+ let request_counters = Arc::new(request_counters);
+
+ SHARED_COUNTER_FLUSHER
+ .write()
+ .unwrap()
+ .register_counter(Arc::clone(&request_counters));
+
+ Some(request_counters)
} else {
None
};
@@ -441,6 +451,10 @@ impl S3Client {
.context("failed to account for upload traffic")?;
let _prev_uploaded =
counters.add_upload_traffic(transferred, Ordering::AcqRel);
+
+ tokio::task::spawn_blocking(|| {
+ let _ = SHARED_COUNTER_FLUSHER.read().unwrap().request_flush();
+ });
}
return Ok(response);
diff --git a/proxmox-s3-client/src/shared_request_counters.rs b/proxmox-s3-client/src/shared_request_counters.rs
index a5cd286c..d3f53c8e 100644
--- a/proxmox-s3-client/src/shared_request_counters.rs
+++ b/proxmox-s3-client/src/shared_request_counters.rs
@@ -1,11 +1,19 @@
+use std::collections::HashMap;
use std::mem::MaybeUninit;
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::{Arc, RwLock};
+use std::time::Duration;
use anyhow::{bail, Error};
use hyper::http::method::Method;
+use nix::sys::mman::MsFlags;
use nix::sys::stat::Mode;
use nix::unistd::User;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
+use tokio::task::JoinHandle;
+use tokio::time::Instant;
use proxmox_shared_memory::{Init, SharedMemory};
use proxmox_sys::fs::CreateOptions;
@@ -163,6 +171,7 @@ impl Init for MappableRequestCounters {
/// If set, the counts can be filtered based on a path prefix.
pub struct SharedRequestCounters {
shared_memory: SharedMemory<MappableRequestCounters>,
+ path: PathBuf,
}
impl SharedRequestCounters {
@@ -170,7 +179,7 @@ impl SharedRequestCounters {
///
/// Opens or creates mmap file and accesses it via shared memory mapping.
pub fn open_shared_memory_mapped<P: AsRef<Path>>(path: P, user: User) -> Result<Self, Error> {
- let path = path.as_ref();
+ let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
let dir_opts = CreateOptions::new()
.perm(Mode::from_bits_truncate(0o770))
@@ -184,8 +193,11 @@ impl SharedRequestCounters {
.perm(Mode::from_bits_truncate(0o660))
.owner(user.uid)
.group(user.gid);
- let shared_memory = SharedMemory::open_non_tmpfs(path, file_opts)?;
- Ok(Self { shared_memory })
+ let shared_memory = SharedMemory::open_non_tmpfs(&path, file_opts)?;
+ Ok(Self {
+ shared_memory,
+ path,
+ })
}
/// Increment the counter for given method, following the provided memory ordering constrains
@@ -243,4 +255,151 @@ impl SharedRequestCounters {
.counters
.get_download_traffic(ordering)
}
+
+ /// Flush in-memory contents to backing file, but do not wait for completion
+ pub fn schedule_flush(&self) -> Result<(), Error> {
+ self.shared_memory.msync(MsFlags::MS_ASYNC)
+ }
+
+ /// Path of shared memory backing file
+ pub fn path_buf(&self) -> PathBuf {
+ self.path.clone()
+ }
+}
+
+const FLUSH_THRESHOLD: Duration = Duration::from_secs(5);
+
+// state for periodic flushing of the mmapped request counter values to the
+// backend
+pub(crate) struct MmapFlusher {
+ task_handler: Option<TaskHandler>,
+ register: Arc<RwLock<HashMap<PathBuf, CounterRegisterItem>>>,
+}
+
+struct CounterRegisterItem {
+ register_count: usize,
+ counters: Arc<SharedRequestCounters>,
+}
+
+struct TaskHandler {
+ request_sender: mpsc::Sender<()>,
+ task_handle: JoinHandle<()>,
+ // Keep reference to runtime while task is being executed
+ _runtime: Arc<tokio::runtime::Runtime>,
+}
+
+impl Drop for TaskHandler {
+ fn drop(&mut self) {
+ self.task_handle.abort();
+ }
+}
+
+impl MmapFlusher {
+ /// Create new empty and inactive flusher instance. Handler task will be created on-demand
+ /// when the first counter is registered.
+ pub(crate) fn new() -> Self {
+ Self {
+ task_handler: None,
+ register: Arc::new(RwLock::new(HashMap::new())),
+ }
+ }
+
+ /// Register the shared request counter to be flushed periodically.
+ pub(crate) fn register_counter(&mut self, counters: Arc<SharedRequestCounters>) {
+ let id = counters.path_buf();
+
+ if self.task_handler.is_none() {
+ self.task_handler = Some(self.init_channel_and_task());
+ }
+
+ let mut register = self.register.write().unwrap();
+ register
+ .entry(id)
+ .and_modify(|item| item.register_count += 1)
+ .or_insert(CounterRegisterItem {
+ register_count: 1,
+ counters,
+ });
+ }
+
+ /// Remove the shared request counter to no longer be flushed by the handler task.
+ pub(crate) fn remove_counter(&mut self, id: &PathBuf) {
+ let mut register = self.register.write().unwrap();
+ if let Some(item) = register.remove(id) {
+ if item.register_count > 1 {
+ register.insert(
+ item.counters.path_buf(),
+ CounterRegisterItem {
+ register_count: item.register_count - 1,
+ counters: item.counters,
+ },
+ );
+ }
+ }
+ if register.is_empty() {
+ // no more registered counters, abort task by dropping
+ self.task_handler.take();
+ }
+ }
+
+ /// Request for the flusher to be executed the next time the timeout is reached.
+ pub(crate) fn request_flush(&self) -> Result<(), Error> {
+ match self.task_handler.as_ref() {
+ Some(handler) => {
+ // ignore when channel full, flush already requested anyways
+ if let Err(TrySendError::Closed(())) = handler.request_sender.try_send(()) {
+ bail!("failed to send flush request, channel closed");
+ }
+ }
+ None => bail!("failed to send flush request, no task handler"),
+ }
+ Ok(())
+ }
+
+ /// Setup or get the current tokio runtime, create channel for requesting flushes and setup
+ /// the task to periodically check for flush requests.
+ fn init_channel_and_task(&self) -> TaskHandler {
+ let (request_sender, mut request_receiver) = mpsc::channel(1);
+
+ let register = Arc::clone(&self.register);
+ let _runtime = proxmox_async::runtime::get_runtime();
+ let task_handle = _runtime.spawn(async move {
+ let mut flush_requested = false;
+ let mut next_timeout = Instant::now() + FLUSH_THRESHOLD;
+
+ loop {
+ match tokio::time::timeout_at(next_timeout, request_receiver.recv()).await {
+ Ok(Some(())) => flush_requested = true,
+ Err(_timeout) => {
+ if flush_requested {
+ Self::handle_flush(Arc::clone(®ister));
+ flush_requested = false;
+ }
+ next_timeout = Instant::now() + FLUSH_THRESHOLD;
+ }
+ _ => {
+ // channel closed or error
+ Self::handle_flush(Arc::clone(®ister));
+ return;
+ }
+ }
+ }
+ });
+
+ TaskHandler {
+ request_sender,
+ task_handle,
+ _runtime,
+ }
+ }
+
+ // Helper to flush all currently registered shared request counters.
+ fn handle_flush(register: Arc<RwLock<HashMap<PathBuf, CounterRegisterItem>>>) {
+ let register = register.read().unwrap();
+ for item in register.values() {
+ if let Err(err) = item.counters.schedule_flush() {
+ tracing::error!("failed to schedule flush: {err}");
+ }
+ }
+ }
}
--
2.47.3
next prev parent reply other threads:[~2026-02-24 9:14 UTC|newest]
Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-02-24 9:13 [PATCH proxmox{,-backup} v3 00/22] partially fix #6563: add s3 request and traffic counter statistics Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 01/10] proxmox-sys: expose msync to flush mmapped contents to filesystem Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 02/10] shared-memory: add method without tmpfs check for mmap file location Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 03/10] shared-memory: expose msync to flush in-memory contents to filesystem Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 04/10] s3-client: add persistent shared request counters for client Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 05/10] s3-client: add counters for upload/download traffic Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 06/10] s3-client: account for upload traffic on successful request sending Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 07/10] s3-client: account for downloaded bytes in incoming response body Christian Ebner
2026-02-24 9:13 ` Christian Ebner [this message]
2026-02-24 9:13 ` [PATCH proxmox v3 09/10] s3-client: sync flush request counters on client instance drop Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox v3 10/10] pbs-api-types: define api type for s3 request statistics Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox-backup v3 01/12] metrics: split common module imports into individual use statements Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox-backup v3 02/12] datastore: collect request statistics for s3 backed datastores Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox-backup v3 03/12] datastore: expose request counters " Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox-backup v3 04/12] api: s3: add endpoint to reset s3 request counters Christian Ebner
2026-02-24 9:13 ` [PATCH proxmox-backup v3 05/12] bin: s3: expose request counter reset method as cli command Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 06/12] datastore: add helper method to get datastore backend type Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 07/12] ui: improve variable name indirectly fixing typo Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 08/12] ui: datastore summary: move store to be part of summary panel Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 09/12] ui: expose s3 request counter statistics in the datastore summary Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 10/12] metrics: collect s3 datastore statistics as rrd metrics Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 11/12] api: admin: expose s3 statistics in datastore rrd data Christian Ebner
2026-02-24 9:14 ` [PATCH proxmox-backup v3 12/12] partially fix #6563: ui: expose s3 rrd charts in datastore summary Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260224091406.169080-9-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox