From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6C60F1FF13C for ; Thu, 05 Mar 2026 13:31:03 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 652417CE1; Thu, 5 Mar 2026 13:32:08 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup] client: cleanup backup stream upload counters Date: Thu, 5 Mar 2026 13:31:18 +0100 Message-ID: <20260305123118.555058-1-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1772713866016 X-SPAM-LEVEL: Spam detection results: 0 AWL -1.000 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.018 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.703 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 1.386 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: FKTMLWJHW37FKLTDPRMCPK4RJPMIRXRG X-Message-ID-Hash: FKTMLWJHW37FKLTDPRMCPK4RJPMIRXRG X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Cleanup the upload counters used to account for backup progress. Firstoff, reduce the required cloning by place the `UploadCounters` itself into an Arc instead each individual counter value. Further, relax the ordering constraints to Ordering::Relaxed on counter updates and Ordering::Acquire on load, as only the total count at the end is of interest. The only notable exception for this is the total stream length, as it is used to calculate the offset within the archive, so use Ordering::AcqRel on counter updates and Ordering::Acquire on loads. Finally, also allows to not require the &mut references on incrementing multiple counters, as again only the final state is of interest. Suggested-by: Fabian Grünbichler Signed-off-by: Christian Ebner --- This stems from the discussion with respect to parallel group syncs: https://lore.proxmox.com/pbs-devel/1744009968.ip6vyc3mbq.astroid@yuna.none/ pbs-client/src/backup_stats.rs | 78 +++++++++++++------------- pbs-client/src/backup_writer.rs | 12 ++-- pbs-client/src/inject_reused_chunks.rs | 7 ++- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs index f0563a001..894e3ad6c 100644 --- a/pbs-client/src/backup_stats.rs +++ b/pbs-client/src/backup_stats.rs @@ -1,7 +1,6 @@ //! Implements counters to generate statistics for log outputs during uploads with backup writer use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::sync::Arc; use std::time::Duration; use crate::pxar::create::ReusableDynamicEntry; @@ -41,77 +40,78 @@ impl UploadStats { } /// Atomic counters for accounting upload stream progress information -#[derive(Clone)] pub(crate) struct UploadCounters { - injected_chunk_count: Arc, - known_chunk_count: Arc, - total_chunk_count: Arc, - compressed_stream_len: Arc, - injected_stream_len: Arc, - reused_stream_len: Arc, - total_stream_len: Arc, + injected_chunk_count: AtomicUsize, + known_chunk_count: AtomicUsize, + total_chunk_count: AtomicUsize, + compressed_stream_len: AtomicU64, + injected_stream_len: AtomicUsize, + reused_stream_len: AtomicUsize, + total_stream_len: AtomicUsize, } impl UploadCounters { /// Create and zero init new upload counters pub(crate) fn new() -> Self { Self { - total_chunk_count: Arc::new(AtomicUsize::new(0)), - injected_chunk_count: Arc::new(AtomicUsize::new(0)), - known_chunk_count: Arc::new(AtomicUsize::new(0)), - compressed_stream_len: Arc::new(AtomicU64::new(0)), - injected_stream_len: Arc::new(AtomicUsize::new(0)), - reused_stream_len: Arc::new(AtomicUsize::new(0)), - total_stream_len: Arc::new(AtomicUsize::new(0)), + total_chunk_count: AtomicUsize::new(0), + injected_chunk_count: AtomicUsize::new(0), + known_chunk_count: AtomicUsize::new(0), + compressed_stream_len: AtomicU64::new(0), + injected_stream_len: AtomicUsize::new(0), + reused_stream_len: AtomicUsize::new(0), + total_stream_len: AtomicUsize::new(0), } } #[inline(always)] - pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize { - self.known_chunk_count.fetch_add(1, Ordering::SeqCst); - self.total_chunk_count.fetch_add(1, Ordering::SeqCst); + pub(crate) fn add_known_chunk(&self, chunk_len: usize) -> usize { + self.known_chunk_count.fetch_add(1, Ordering::Relaxed); + self.total_chunk_count.fetch_add(1, Ordering::Relaxed); self.reused_stream_len - .fetch_add(chunk_len, Ordering::SeqCst); - self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst) + .fetch_add(chunk_len, Ordering::Relaxed); + self.total_stream_len + .fetch_add(chunk_len, Ordering::AcqRel) } #[inline(always)] - pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize { - self.total_chunk_count.fetch_add(1, Ordering::SeqCst); + pub(crate) fn add_new_chunk(&self, chunk_len: usize, chunk_raw_size: u64) -> usize { + self.total_chunk_count.fetch_add(1, Ordering::Relaxed); self.compressed_stream_len - .fetch_add(chunk_raw_size, Ordering::SeqCst); - self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst) + .fetch_add(chunk_raw_size, Ordering::Relaxed); + self.total_stream_len + .fetch_add(chunk_len, Ordering::AcqRel) } #[inline(always)] - pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize { - self.total_chunk_count.fetch_add(1, Ordering::SeqCst); - self.injected_chunk_count.fetch_add(1, Ordering::SeqCst); + pub(crate) fn add_injected_chunk(&self, chunk: &ReusableDynamicEntry) -> usize { + self.total_chunk_count.fetch_add(1, Ordering::Relaxed); + self.injected_chunk_count.fetch_add(1, Ordering::Relaxed); self.reused_stream_len - .fetch_add(chunk.size() as usize, Ordering::SeqCst); + .fetch_add(chunk.size() as usize, Ordering::Relaxed); self.injected_stream_len - .fetch_add(chunk.size() as usize, Ordering::SeqCst); + .fetch_add(chunk.size() as usize, Ordering::Relaxed); self.total_stream_len - .fetch_add(chunk.size() as usize, Ordering::SeqCst) + .fetch_add(chunk.size() as usize, Ordering::AcqRel) } #[inline(always)] pub(crate) fn total_stream_len(&self) -> usize { - self.total_stream_len.load(Ordering::SeqCst) + self.total_stream_len.load(Ordering::Acquire) } /// Convert the counters to [`UploadStats`], including given archive checksum and runtime. #[inline(always)] pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats { UploadStats { - chunk_count: self.total_chunk_count.load(Ordering::SeqCst), - chunk_reused: self.known_chunk_count.load(Ordering::SeqCst), - chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst), - size: self.total_stream_len.load(Ordering::SeqCst), - size_reused: self.reused_stream_len.load(Ordering::SeqCst), - size_injected: self.injected_stream_len.load(Ordering::SeqCst), - size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize, + chunk_count: self.total_chunk_count.load(Ordering::Acquire), + chunk_reused: self.known_chunk_count.load(Ordering::Acquire), + chunk_injected: self.injected_chunk_count.load(Ordering::Acquire), + size: self.total_stream_len.load(Ordering::Acquire), + size_reused: self.reused_stream_len.load(Ordering::Acquire), + size_injected: self.injected_stream_len.load(Ordering::Acquire), + size_compressed: self.compressed_stream_len.load(Ordering::Acquire) as usize, duration, csum, } diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index dbd177d86..34e15e9ea 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -310,8 +310,8 @@ impl BackupWriter { .as_u64() .unwrap(); - let mut counters = UploadCounters::new(); - let counters_readonly = counters.clone(); + let counters = Arc::new(UploadCounters::new()); + let counters_readonly = Arc::clone(&counters); let is_fixed_chunk_size = prefix == "fixed"; @@ -754,8 +754,8 @@ impl BackupWriter { injections: Option>, archive: &BackupArchiveName, ) -> impl Future> { - let mut counters = UploadCounters::new(); - let counters_readonly = counters.clone(); + let counters = Arc::new(UploadCounters::new()); + let counters_readonly = Arc::clone(&counters); let is_fixed_chunk_size = prefix == "fixed"; @@ -763,7 +763,7 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); let stream = stream - .inject_reused_chunks(injections, counters.clone()) + .inject_reused_chunks(injections, Arc::clone(&counters)) .and_then(move |chunk_info| match chunk_info { InjectedChunksInfo::Known(chunks) => { // account for injected chunks @@ -848,7 +848,7 @@ impl BackupWriter { prefix: &str, stream: impl Stream>, index_csum: Arc>>, - counters: UploadCounters, + counters: Arc, ) -> impl Future> { let append_chunk_path = format!("{prefix}_index"); let upload_chunk_path = format!("{prefix}_chunk"); diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs index 6da2bcd16..b5902d7c7 100644 --- a/pbs-client/src/inject_reused_chunks.rs +++ b/pbs-client/src/inject_reused_chunks.rs @@ -1,6 +1,7 @@ use std::cmp; use std::pin::Pin; use std::sync::mpsc; +use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::{anyhow, Error}; @@ -16,7 +17,7 @@ pin_project! { input: S, next_injection: Option, injections: Option>, - counters: UploadCounters, + counters: Arc, } } @@ -42,7 +43,7 @@ pub trait InjectReusedChunks: Sized { fn inject_reused_chunks( self, injections: Option>, - counters: UploadCounters, + counters: Arc, ) -> InjectReusedChunksQueue; } @@ -53,7 +54,7 @@ where fn inject_reused_chunks( self, injections: Option>, - counters: UploadCounters, + counters: Arc, ) -> InjectReusedChunksQueue { InjectReusedChunksQueue { input: self, -- 2.47.3