From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup] client: cleanup backup stream upload counters
Date: Thu, 5 Mar 2026 13:31:18 +0100 [thread overview]
Message-ID: <20260305123118.555058-1-c.ebner@proxmox.com> (raw)
Cleanup the upload counters used to account for backup progress.
Firstoff, reduce the required cloning by place the `UploadCounters`
itself into an Arc instead each individual counter value.
Further, relax the ordering constraints to Ordering::Relaxed on
counter updates and Ordering::Acquire on load, as only the total count
at the end is of interest.
The only notable exception for this is the total stream length, as it
is used to calculate the offset within the archive, so use
Ordering::AcqRel on counter updates and Ordering::Acquire on loads.
Finally, also allows to not require the &mut references on
incrementing multiple counters, as again only the final state is of
interest.
Suggested-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
This stems from the discussion with respect to parallel group syncs:
https://lore.proxmox.com/pbs-devel/1744009968.ip6vyc3mbq.astroid@yuna.none/
pbs-client/src/backup_stats.rs | 78 +++++++++++++-------------
pbs-client/src/backup_writer.rs | 12 ++--
pbs-client/src/inject_reused_chunks.rs | 7 ++-
3 files changed, 49 insertions(+), 48 deletions(-)
diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index f0563a001..894e3ad6c 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -1,7 +1,6 @@
//! Implements counters to generate statistics for log outputs during uploads with backup writer
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
-use std::sync::Arc;
use std::time::Duration;
use crate::pxar::create::ReusableDynamicEntry;
@@ -41,77 +40,78 @@ impl UploadStats {
}
/// Atomic counters for accounting upload stream progress information
-#[derive(Clone)]
pub(crate) struct UploadCounters {
- injected_chunk_count: Arc<AtomicUsize>,
- known_chunk_count: Arc<AtomicUsize>,
- total_chunk_count: Arc<AtomicUsize>,
- compressed_stream_len: Arc<AtomicU64>,
- injected_stream_len: Arc<AtomicUsize>,
- reused_stream_len: Arc<AtomicUsize>,
- total_stream_len: Arc<AtomicUsize>,
+ injected_chunk_count: AtomicUsize,
+ known_chunk_count: AtomicUsize,
+ total_chunk_count: AtomicUsize,
+ compressed_stream_len: AtomicU64,
+ injected_stream_len: AtomicUsize,
+ reused_stream_len: AtomicUsize,
+ total_stream_len: AtomicUsize,
}
impl UploadCounters {
/// Create and zero init new upload counters
pub(crate) fn new() -> Self {
Self {
- total_chunk_count: Arc::new(AtomicUsize::new(0)),
- injected_chunk_count: Arc::new(AtomicUsize::new(0)),
- known_chunk_count: Arc::new(AtomicUsize::new(0)),
- compressed_stream_len: Arc::new(AtomicU64::new(0)),
- injected_stream_len: Arc::new(AtomicUsize::new(0)),
- reused_stream_len: Arc::new(AtomicUsize::new(0)),
- total_stream_len: Arc::new(AtomicUsize::new(0)),
+ total_chunk_count: AtomicUsize::new(0),
+ injected_chunk_count: AtomicUsize::new(0),
+ known_chunk_count: AtomicUsize::new(0),
+ compressed_stream_len: AtomicU64::new(0),
+ injected_stream_len: AtomicUsize::new(0),
+ reused_stream_len: AtomicUsize::new(0),
+ total_stream_len: AtomicUsize::new(0),
}
}
#[inline(always)]
- pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize {
- self.known_chunk_count.fetch_add(1, Ordering::SeqCst);
- self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+ pub(crate) fn add_known_chunk(&self, chunk_len: usize) -> usize {
+ self.known_chunk_count.fetch_add(1, Ordering::Relaxed);
+ self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
self.reused_stream_len
- .fetch_add(chunk_len, Ordering::SeqCst);
- self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+ .fetch_add(chunk_len, Ordering::Relaxed);
+ self.total_stream_len
+ .fetch_add(chunk_len, Ordering::AcqRel)
}
#[inline(always)]
- pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize {
- self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
+ pub(crate) fn add_new_chunk(&self, chunk_len: usize, chunk_raw_size: u64) -> usize {
+ self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
self.compressed_stream_len
- .fetch_add(chunk_raw_size, Ordering::SeqCst);
- self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst)
+ .fetch_add(chunk_raw_size, Ordering::Relaxed);
+ self.total_stream_len
+ .fetch_add(chunk_len, Ordering::AcqRel)
}
#[inline(always)]
- pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize {
- self.total_chunk_count.fetch_add(1, Ordering::SeqCst);
- self.injected_chunk_count.fetch_add(1, Ordering::SeqCst);
+ pub(crate) fn add_injected_chunk(&self, chunk: &ReusableDynamicEntry) -> usize {
+ self.total_chunk_count.fetch_add(1, Ordering::Relaxed);
+ self.injected_chunk_count.fetch_add(1, Ordering::Relaxed);
self.reused_stream_len
- .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ .fetch_add(chunk.size() as usize, Ordering::Relaxed);
self.injected_stream_len
- .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ .fetch_add(chunk.size() as usize, Ordering::Relaxed);
self.total_stream_len
- .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+ .fetch_add(chunk.size() as usize, Ordering::AcqRel)
}
#[inline(always)]
pub(crate) fn total_stream_len(&self) -> usize {
- self.total_stream_len.load(Ordering::SeqCst)
+ self.total_stream_len.load(Ordering::Acquire)
}
/// Convert the counters to [`UploadStats`], including given archive checksum and runtime.
#[inline(always)]
pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats {
UploadStats {
- chunk_count: self.total_chunk_count.load(Ordering::SeqCst),
- chunk_reused: self.known_chunk_count.load(Ordering::SeqCst),
- chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst),
- size: self.total_stream_len.load(Ordering::SeqCst),
- size_reused: self.reused_stream_len.load(Ordering::SeqCst),
- size_injected: self.injected_stream_len.load(Ordering::SeqCst),
- size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize,
+ chunk_count: self.total_chunk_count.load(Ordering::Acquire),
+ chunk_reused: self.known_chunk_count.load(Ordering::Acquire),
+ chunk_injected: self.injected_chunk_count.load(Ordering::Acquire),
+ size: self.total_stream_len.load(Ordering::Acquire),
+ size_reused: self.reused_stream_len.load(Ordering::Acquire),
+ size_injected: self.injected_stream_len.load(Ordering::Acquire),
+ size_compressed: self.compressed_stream_len.load(Ordering::Acquire) as usize,
duration,
csum,
}
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index dbd177d86..34e15e9ea 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -310,8 +310,8 @@ impl BackupWriter {
.as_u64()
.unwrap();
- let mut counters = UploadCounters::new();
- let counters_readonly = counters.clone();
+ let counters = Arc::new(UploadCounters::new());
+ let counters_readonly = Arc::clone(&counters);
let is_fixed_chunk_size = prefix == "fixed";
@@ -754,8 +754,8 @@ impl BackupWriter {
injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
archive: &BackupArchiveName,
) -> impl Future<Output = Result<UploadStats, Error>> {
- let mut counters = UploadCounters::new();
- let counters_readonly = counters.clone();
+ let counters = Arc::new(UploadCounters::new());
+ let counters_readonly = Arc::clone(&counters);
let is_fixed_chunk_size = prefix == "fixed";
@@ -763,7 +763,7 @@ impl BackupWriter {
let index_csum_2 = index_csum.clone();
let stream = stream
- .inject_reused_chunks(injections, counters.clone())
+ .inject_reused_chunks(injections, Arc::clone(&counters))
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
// account for injected chunks
@@ -848,7 +848,7 @@ impl BackupWriter {
prefix: &str,
stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
index_csum: Arc<Mutex<Option<Sha256>>>,
- counters: UploadCounters,
+ counters: Arc<UploadCounters>,
) -> impl Future<Output = Result<UploadStats, Error>> {
let append_chunk_path = format!("{prefix}_index");
let upload_chunk_path = format!("{prefix}_chunk");
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
index 6da2bcd16..b5902d7c7 100644
--- a/pbs-client/src/inject_reused_chunks.rs
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -1,6 +1,7 @@
use std::cmp;
use std::pin::Pin;
use std::sync::mpsc;
+use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::{anyhow, Error};
@@ -16,7 +17,7 @@ pin_project! {
input: S,
next_injection: Option<InjectChunks>,
injections: Option<mpsc::Receiver<InjectChunks>>,
- counters: UploadCounters,
+ counters: Arc<UploadCounters>,
}
}
@@ -42,7 +43,7 @@ pub trait InjectReusedChunks: Sized {
fn inject_reused_chunks(
self,
injections: Option<mpsc::Receiver<InjectChunks>>,
- counters: UploadCounters,
+ counters: Arc<UploadCounters>,
) -> InjectReusedChunksQueue<Self>;
}
@@ -53,7 +54,7 @@ where
fn inject_reused_chunks(
self,
injections: Option<mpsc::Receiver<InjectChunks>>,
- counters: UploadCounters,
+ counters: Arc<UploadCounters>,
) -> InjectReusedChunksQueue<Self> {
InjectReusedChunksQueue {
input: self,
--
2.47.3
reply other threads:[~2026-03-05 12:31 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260305123118.555058-1-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox