public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [PATCH proxmox-backup] client: cleanup backup stream upload counters
@ 2026-03-05 12:31 Christian Ebner
  0 siblings, 0 replies; only message in thread
From: Christian Ebner @ 2026-03-05 12:31 UTC (permalink / raw)
  To: pbs-devel

Cleanup the upload counters used to account for backup progress.

Firstoff, reduce the required cloning by place the `UploadCounters`
itself into an Arc instead each individual counter value.

Further, relax the ordering constraints to Ordering::Relaxed on
counter updates and Ordering::Acquire on load, as only the total count
at the end is of interest.

The only notable exception for this is the total stream length, as it
is used to calculate the offset within the archive, so use
Ordering::AcqRel on counter updates and Ordering::Acquire on loads.

Finally, also allows to not require the &mut references on
incrementing multiple counters, as again only the final state is of
interest.

Suggested-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
This stems from the discussion with respect to parallel group syncs:
https://lore.proxmox.com/pbs-devel/1744009968.ip6vyc3mbq.astroid@yuna.none/

 pbs-client/src/backup_stats.rs         | 78 +++++++++++++-------------
 pbs-client/src/backup_writer.rs        | 12 ++--
 pbs-client/src/inject_reused_chunks.rs |  7 ++-
 3 files changed, 49 insertions(+), 48 deletions(-)

diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index f0563a001..894e3ad6c 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -1,7 +1,6 @@
 //! Implements counters to generate statistics for log outputs during uploads with backup writer
 
 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
-use std::sync::Arc;
 use std::time::Duration;
 
 use crate::pxar::create::ReusableDynamicEntry;
@@ -41,77 +40,78 @@ impl UploadStats {
 }
 
 /// Atomic counters for accounting upload stream progress information
-#[derive(Clone)]
 pub(crate) struct UploadCounters {
-    injected_chunk_count: Arc<AtomicUsize>,
-    known_chunk_count: Arc<AtomicUsize>,
-    total_chunk_count: Arc<AtomicUsize>,
-    compressed_stream_len: Arc<AtomicU64>,
-    injected_stream_len: Arc<AtomicUsize>,
-    reused_stream_len: Arc<AtomicUsize>,
-    total_stream_len: Arc<AtomicUsize>,
+    injected_chunk_count: AtomicUsize,
+    known_chunk_count: AtomicUsize,
+    total_chunk_count: AtomicUsize,
+    compressed_stream_len: AtomicU64,
+    injected_stream_len: AtomicUsize,
+    reused_stream_len: AtomicUsize,
+    total_stream_len: AtomicUsize,
 }
 
 impl UploadCounters {
     /// Create and zero init new upload counters
     pub(crate) fn new() -> Self {
         Self {
-            total_chunk_count: Arc::new(AtomicUsize::new(0)),
-            injected_chunk_count: Arc::new(AtomicUsize::new(0)),
-            known_chunk_count: Arc::new(AtomicUsize::new(0)),
-            compressed_stream_len: Arc::new(AtomicU64::new(0)),
-            injected_stream_len: Arc::new(AtomicUsize::new(0)),
-            reused_stream_len: Arc::new(AtomicUsize::new(0)),
-            total_stream_len: Arc::new(AtomicUsize::new(0)),
+            total_chunk_count: AtomicUsize::new(0),
+            injected_chunk_count: AtomicUsize::new(0),
+            known_chunk_count: AtomicUsize::new(0),
+            compressed_stream_len: AtomicU64::new(0),
+            injected_stream_len: AtomicUsize::new(0),
+            reused_stream_len: AtomicUsize::new(0),
+            total_stream_len: AtomicUsize::new(0),
         }
     }
 
     #[inline(always)]
-    pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize {
-        self.known_chunk_count.fetch_add(1, Ordering::SeqCst);
-        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+    pub(crate) fn add_known_chunk(&self, chunk_len: usize) -> usize {
+        self.known_chunk_count.fetch_add(1, Ordering::Relaxed);
+        self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
         self.reused_stream_len
-            .fetch_add(chunk_len, Ordering::SeqCst);
-        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+            .fetch_add(chunk_len, Ordering::Relaxed);
+        self.total_stream_len
+            .fetch_add(chunk_len, Ordering::AcqRel)
     }
 
     #[inline(always)]
-    pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize {
-        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+    pub(crate) fn add_new_chunk(&self, chunk_len: usize, chunk_raw_size: u64) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
         self.compressed_stream_len
-            .fetch_add(chunk_raw_size, Ordering::SeqCst);
-        self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+            .fetch_add(chunk_raw_size, Ordering::Relaxed);
+        self.total_stream_len
+            .fetch_add(chunk_len, Ordering::AcqRel)
     }
 
     #[inline(always)]
-    pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize {
-        self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
-        self.injected_chunk_count.fetch_add(1, Ordering::SeqCst);
+    pub(crate) fn add_injected_chunk(&self, chunk: &ReusableDynamicEntry) -> usize {
+        self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
+        self.injected_chunk_count.fetch_add(1, Ordering::Relaxed);
 
         self.reused_stream_len
-            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+            .fetch_add(chunk.size() as usize, Ordering::Relaxed);
         self.injected_stream_len
-            .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+            .fetch_add(chunk.size() as usize, Ordering::Relaxed);
         self.total_stream_len
-            .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+            .fetch_add(chunk.size() as usize, Ordering::AcqRel)
     }
 
     #[inline(always)]
     pub(crate) fn total_stream_len(&self) -> usize {
-        self.total_stream_len.load(Ordering::SeqCst)
+        self.total_stream_len.load(Ordering::Acquire)
     }
 
     /// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
     #[inline(always)]
     pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats {
         UploadStats {
-            chunk_count: self.total_chunk_count.load(Ordering::SeqCst),
-            chunk_reused: self.known_chunk_count.load(Ordering::SeqCst),
-            chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst),
-            size: self.total_stream_len.load(Ordering::SeqCst),
-            size_reused: self.reused_stream_len.load(Ordering::SeqCst),
-            size_injected: self.injected_stream_len.load(Ordering::SeqCst),
-            size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize,
+            chunk_count: self.total_chunk_count.load(Ordering::Acquire),
+            chunk_reused: self.known_chunk_count.load(Ordering::Acquire),
+            chunk_injected: self.injected_chunk_count.load(Ordering::Acquire),
+            size: self.total_stream_len.load(Ordering::Acquire),
+            size_reused: self.reused_stream_len.load(Ordering::Acquire),
+            size_injected: self.injected_stream_len.load(Ordering::Acquire),
+            size_compressed: self.compressed_stream_len.load(Ordering::Acquire) as usize,
             duration,
             csum,
         }
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index dbd177d86..34e15e9ea 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -310,8 +310,8 @@ impl BackupWriter {
             .as_u64()
             .unwrap();
 
-        let mut counters = UploadCounters::new();
-        let counters_readonly = counters.clone();
+        let counters = Arc::new(UploadCounters::new());
+        let counters_readonly = Arc::clone(&counters);
 
         let is_fixed_chunk_size = prefix == "fixed";
 
@@ -754,8 +754,8 @@ impl BackupWriter {
         injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
         archive: &BackupArchiveName,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
-        let mut counters = UploadCounters::new();
-        let counters_readonly = counters.clone();
+        let counters = Arc::new(UploadCounters::new());
+        let counters_readonly = Arc::clone(&counters);
 
         let is_fixed_chunk_size = prefix == "fixed";
 
@@ -763,7 +763,7 @@ impl BackupWriter {
         let index_csum_2 = index_csum.clone();
 
         let stream = stream
-            .inject_reused_chunks(injections, counters.clone())
+            .inject_reused_chunks(injections, Arc::clone(&counters))
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
                     // account for injected chunks
@@ -848,7 +848,7 @@ impl BackupWriter {
         prefix: &str,
         stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
         index_csum: Arc<Mutex<Option<Sha256>>>,
-        counters: UploadCounters,
+        counters: Arc<UploadCounters>,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let append_chunk_path = format!("{prefix}_index");
         let upload_chunk_path = format!("{prefix}_chunk");
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
index 6da2bcd16..b5902d7c7 100644
--- a/pbs-client/src/inject_reused_chunks.rs
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -1,6 +1,7 @@
 use std::cmp;
 use std::pin::Pin;
 use std::sync::mpsc;
+use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use anyhow::{anyhow, Error};
@@ -16,7 +17,7 @@ pin_project! {
         input: S,
         next_injection: Option<InjectChunks>,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        counters: UploadCounters,
+        counters: Arc<UploadCounters>,
     }
 }
 
@@ -42,7 +43,7 @@ pub trait InjectReusedChunks: Sized {
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        counters: UploadCounters,
+        counters: Arc<UploadCounters>,
     ) -> InjectReusedChunksQueue<Self>;
 }
 
@@ -53,7 +54,7 @@ where
     fn inject_reused_chunks(
         self,
         injections: Option<mpsc::Receiver<InjectChunks>>,
-        counters: UploadCounters,
+        counters: Arc<UploadCounters>,
     ) -> InjectReusedChunksQueue<Self> {
         InjectReusedChunksQueue {
             input: self,
-- 
2.47.3





^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2026-03-05 12:31 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-03-05 12:31 [PATCH proxmox-backup] client: cleanup backup stream upload counters Christian Ebner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal