From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 3F7841FF166 for ; Fri, 25 Oct 2024 12:21:07 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 9EEBF1D9F1; Fri, 25 Oct 2024 12:21:06 +0200 (CEST) Date: Fri, 25 Oct 2024 12:20:56 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20241018084242.144010-1-c.ebner@proxmox.com> <20241018084242.144010-2-c.ebner@proxmox.com> In-Reply-To: <20241018084242.144010-2-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1729851524.1amrz1baou.astroid@yuna.none> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox.com, lib.rs] Subject: Re: [pbs-devel] [PATCH v5 proxmox-backup 01/31] client: backup writer: refactor backup and upload stats counters X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" as discussed off-list, I think this could be improved further refactoring also caught a missing increment of the compressed stream length ;) diff on top of the whole series: diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs index 7aa618667..87a2b1c53 100644 --- a/pbs-client/src/backup_stats.rs +++ b/pbs-client/src/backup_stats.rs @@ -4,6 +4,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use crate::pxar::create::ReusableDynamicEntry; + /// Basic backup run statistics and archive checksum pub struct BackupStats { pub size: u64, @@ -64,52 +66,40 @@ impl UploadCounters { } } - /// Increment total chunk counter by `count`, returns previous value - #[inline(always)] - pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize { - self.total_chunk_count.fetch_add(count, Ordering::SeqCst) - } - - /// Increment known chunk counter by `count`, returns previous value - #[inline(always)] - pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize { - self.known_chunk_count.fetch_add(count, Ordering::SeqCst) - } - - /// Increment injected chunk counter by `count`, returns previous value - #[inline(always)] - pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize { - self.injected_chunk_count.fetch_add(count, Ordering::SeqCst) - } - - /// Increment stream length counter by given size, returns previous value #[inline(always)] - pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize { - self.total_stream_len.fetch_add(size, Ordering::SeqCst) + pub(crate) fn add_known_chunk(&mut self, chunk_len: usize) -> usize { + self.known_chunk_count.fetch_add(1, Ordering::SeqCst); + self.total_chunk_count.fetch_add(1, Ordering::SeqCst); + self.reused_stream_len + .fetch_add(chunk_len, Ordering::SeqCst); + self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst) } - /// Increment reused length counter by given size, returns previous value #[inline(always)] - pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize { - self.reused_stream_len.fetch_add(size, Ordering::SeqCst) + pub(crate) fn add_new_chunk(&mut self, chunk_len: usize, chunk_raw_size: u64) -> usize { + self.total_chunk_count.fetch_add(1, Ordering::SeqCst); + self.compressed_stream_len + .fetch_add(chunk_raw_size, Ordering::SeqCst); + self.total_stream_len.fetch_add(chunk_len, Ordering::SeqCst) } - /// Increment compressed length counter by given size, returns previous value #[inline(always)] - pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 { - self.compressed_stream_len.fetch_add(size, Ordering::SeqCst) - } + pub(crate) fn add_injected_chunk(&mut self, chunk: &ReusableDynamicEntry) -> usize { + self.total_chunk_count.fetch_add(1, Ordering::SeqCst); + self.injected_chunk_count.fetch_add(1, Ordering::SeqCst); - /// Increment stream length counter by given size, returns previous value - #[inline(always)] - pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize { - self.injected_stream_len.fetch_add(size, Ordering::SeqCst) + self.reused_stream_len + .fetch_add(chunk.size() as usize, Ordering::SeqCst); + self.injected_stream_len + .fetch_add(chunk.size() as usize, Ordering::SeqCst); + self.total_stream_len + .fetch_add(chunk.size() as usize, Ordering::SeqCst) } /// Return a Arc clone to the total stream length counter #[inline(always)] - pub(crate) fn total_stream_len_counter(&self) -> Arc { - self.total_stream_len.clone() + pub(crate) fn total_stream_len_counter(&self) -> usize { + self.total_stream_len.load(Ordering::SeqCst) } /// Convert the counters to [`UploadStats`], including given archive checksum and runtime. diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index a09757486..58b1c226f 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -304,9 +304,9 @@ impl BackupWriter { .and_then(move |mut merged_chunk_info| { match merged_chunk_info { MergedChunkInfo::New(ref chunk_info) => { - counters.inc_total_chunks(1); let chunk_len = chunk_info.chunk_len; - let offset = counters.inc_total_stream_len(chunk_len as usize); + let offset = + counters.add_new_chunk(chunk_len as usize, chunk_info.chunk.raw_size()); let end_offset = offset as u64 + chunk_len; let mut guard = index_csum.lock().unwrap(); let csum = guard.as_mut().unwrap(); @@ -317,10 +317,7 @@ impl BackupWriter { } MergedChunkInfo::Known(ref mut known_chunk_list) => { for (chunk_len, digest) in known_chunk_list { - counters.inc_total_chunks(1); - counters.inc_known_chunks(1); - counters.inc_reused_stream_len(*chunk_len as usize); - let offset = counters.inc_total_stream_len(*chunk_len as usize); + let offset = counters.add_known_chunk(*chunk_len as usize); let end_offset = offset as u64 + *chunk_len; let mut guard = index_csum.lock().unwrap(); let csum = guard.as_mut().unwrap(); @@ -753,21 +750,15 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); let stream = stream - .inject_reused_chunks(injections, counters.total_stream_len_counter()) + .inject_reused_chunks(injections, counters.clone()) .and_then(move |chunk_info| match chunk_info { InjectedChunksInfo::Known(chunks) => { // account for injected chunks - let count = chunks.len(); - counters.inc_total_chunks(count); - counters.inc_injected_chunks(count); - let mut known = Vec::new(); let mut guard = index_csum.lock().unwrap(); let csum = guard.as_mut().unwrap(); for chunk in chunks { - let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64; - counters.inc_reused_stream_len(chunk.size() as usize); - counters.inc_injected_stream_len(chunk.size() as usize); + let offset = counters.add_injected_chunk(&chunk) as u64; let digest = chunk.digest(); known.push((offset, digest)); let end_offset = offset + chunk.size(); @@ -780,9 +771,6 @@ impl BackupWriter { // account for not injected chunks (new and known) let chunk_len = data.len(); - counters.inc_total_chunks(1); - let offset = counters.inc_total_stream_len(chunk_len) as u64; - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); if let Some(ref crypt_config) = crypt_config { @@ -790,7 +778,30 @@ impl BackupWriter { } let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + let digest = chunk_builder.digest().clone(); + let chunk_is_known = known_chunks.contains(&digest); + let (offset, res) = if chunk_is_known { + let offset = counters.add_known_chunk(chunk_len) as u64; + (offset, MergedChunkInfo::Known(vec![(offset, digest)])) + } else { + match chunk_builder.build() { + Ok((chunk, digest)) => { + let offset = + counters.add_new_chunk(chunk_len, chunk.raw_size()) as u64; + known_chunks.insert(digest); + ( + offset, + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len: chunk_len as u64, + offset, + }), + ) + } + Err(err) => return future::ready(Err(err)), + } + }; let mut guard = index_csum.lock().unwrap(); let csum = guard.as_mut().unwrap(); @@ -800,26 +811,9 @@ impl BackupWriter { if !is_fixed_chunk_size { csum.update(&chunk_end.to_le_bytes()); } - csum.update(digest); + csum.update(&digest); - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - counters.inc_known_chunks(1); - counters.inc_reused_stream_len(chunk_len); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let mut counters = counters.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - counters.inc_compressed_stream_len(chunk.raw_size()); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) - } + future::ok(res) } }) .merge_known_chunks(); @@ -848,8 +842,7 @@ impl BackupWriter { let upload_chunk_path = format!("{prefix}_chunk"); let start_time = std::time::Instant::now(); - let total_stream_len = counters.total_stream_len_counter(); - let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let uploaded_len = Arc::new(AtomicUsize::new(0)); let (upload_queue, upload_result) = Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone()); @@ -858,11 +851,12 @@ impl BackupWriter { || archive.ends_with(".pxar") || archive.ends_with(".ppxar") { + let counters = counters.clone(); Some(tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst)); + let size = HumanByte::from(counters.total_stream_len_counter()); let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst)); let elapsed = TimeSpan::from(start_time.elapsed()); diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs index 4b2922012..b93b8b846 100644 --- a/pbs-client/src/inject_reused_chunks.rs +++ b/pbs-client/src/inject_reused_chunks.rs @@ -1,13 +1,13 @@ use std::cmp; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{mpsc, Arc}; +use std::sync::mpsc; use std::task::{Context, Poll}; use anyhow::{anyhow, Error}; use futures::{ready, Stream}; use pin_project_lite::pin_project; +use crate::backup_stats::UploadCounters; use crate::pxar::create::ReusableDynamicEntry; pin_project! { @@ -16,7 +16,7 @@ pin_project! { input: S, next_injection: Option, injections: Option>, - stream_len: Arc, + counters: UploadCounters, } } @@ -42,7 +42,7 @@ pub trait InjectReusedChunks: Sized { fn inject_reused_chunks( self, injections: Option>, - stream_len: Arc, + counters: UploadCounters, ) -> InjectReusedChunksQueue; } @@ -53,13 +53,13 @@ where fn inject_reused_chunks( self, injections: Option>, - stream_len: Arc, + counters: UploadCounters, ) -> InjectReusedChunksQueue { InjectReusedChunksQueue { input: self, next_injection: None, injections, - stream_len, + counters, } } } @@ -85,7 +85,7 @@ where if let Some(inject) = this.next_injection.take() { // got reusable dynamic entries to inject - let offset = this.stream_len.load(Ordering::SeqCst) as u64; + let offset = this.counters.total_stream_len_counter() as u64; match inject.boundary.cmp(&offset) { // inject now On October 18, 2024 10:42 am, Christian Ebner wrote: > In preparation for push support in sync jobs. > > Extend and move `BackupStats` into `backup_stats` submodule and add > method to create them from `UploadStats`. > > Further, introduce `UploadCounters` struct to hold the Arc clones of > the chunk upload statistics counters, simplifying the house keeping. > > By bundling the counters into the struct, they can be passed as > single function parameter when factoring out the common stream future > in the subsequent implementation of the chunk upload for sync jobs > in push direction. > > Signed-off-by: Christian Ebner > --- > changes since version 4: > - Rebased onto current master > > changes since version 3: > - not present in previous version > > pbs-client/src/backup_stats.rs | 130 ++++++++++++++++++++++++++++++++ > pbs-client/src/backup_writer.rs | 111 +++++++++------------------ > pbs-client/src/lib.rs | 3 + > 3 files changed, 169 insertions(+), 75 deletions(-) > create mode 100644 pbs-client/src/backup_stats.rs > > diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs > new file mode 100644 > index 000000000..7aa618667 > --- /dev/null > +++ b/pbs-client/src/backup_stats.rs > @@ -0,0 +1,130 @@ > +//! Implements counters to generate statistics for log outputs during uploads with backup writer > + > +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > +use std::sync::Arc; > +use std::time::Duration; > + > +/// Basic backup run statistics and archive checksum > +pub struct BackupStats { > + pub size: u64, > + pub csum: [u8; 32], > + pub duration: Duration, > + pub chunk_count: u64, > +} > + > +/// Extended backup run statistics and archive checksum > +pub(crate) struct UploadStats { > + pub(crate) chunk_count: usize, > + pub(crate) chunk_reused: usize, > + pub(crate) chunk_injected: usize, > + pub(crate) size: usize, > + pub(crate) size_reused: usize, > + pub(crate) size_injected: usize, > + pub(crate) size_compressed: usize, > + pub(crate) duration: Duration, > + pub(crate) csum: [u8; 32], > +} > + > +impl UploadStats { > + /// Convert the upload stats to the more concise [`BackupStats`] > + #[inline(always)] > + pub(crate) fn to_backup_stats(&self) -> BackupStats { > + BackupStats { > + chunk_count: self.chunk_count as u64, > + size: self.size as u64, > + duration: self.duration, > + csum: self.csum, > + } > + } > +} > + > +/// Atomic counters for accounting upload stream progress information > +#[derive(Clone)] > +pub(crate) struct UploadCounters { > + injected_chunk_count: Arc, > + known_chunk_count: Arc, > + total_chunk_count: Arc, > + compressed_stream_len: Arc, > + injected_stream_len: Arc, > + reused_stream_len: Arc, > + total_stream_len: Arc, > +} > + > +impl UploadCounters { > + /// Create and zero init new upload counters > + pub(crate) fn new() -> Self { > + Self { > + total_chunk_count: Arc::new(AtomicUsize::new(0)), > + injected_chunk_count: Arc::new(AtomicUsize::new(0)), > + known_chunk_count: Arc::new(AtomicUsize::new(0)), > + compressed_stream_len: Arc::new(AtomicU64::new(0)), > + injected_stream_len: Arc::new(AtomicUsize::new(0)), > + reused_stream_len: Arc::new(AtomicUsize::new(0)), > + total_stream_len: Arc::new(AtomicUsize::new(0)), > + } > + } > + > + /// Increment total chunk counter by `count`, returns previous value > + #[inline(always)] > + pub(crate) fn inc_total_chunks(&mut self, count: usize) -> usize { > + self.total_chunk_count.fetch_add(count, Ordering::SeqCst) > + } > + > + /// Increment known chunk counter by `count`, returns previous value > + #[inline(always)] > + pub(crate) fn inc_known_chunks(&mut self, count: usize) -> usize { > + self.known_chunk_count.fetch_add(count, Ordering::SeqCst) > + } > + > + /// Increment injected chunk counter by `count`, returns previous value > + #[inline(always)] > + pub(crate) fn inc_injected_chunks(&mut self, count: usize) -> usize { > + self.injected_chunk_count.fetch_add(count, Ordering::SeqCst) > + } > + > + /// Increment stream length counter by given size, returns previous value > + #[inline(always)] > + pub(crate) fn inc_total_stream_len(&mut self, size: usize) -> usize { > + self.total_stream_len.fetch_add(size, Ordering::SeqCst) > + } > + > + /// Increment reused length counter by given size, returns previous value > + #[inline(always)] > + pub(crate) fn inc_reused_stream_len(&mut self, size: usize) -> usize { > + self.reused_stream_len.fetch_add(size, Ordering::SeqCst) > + } > + > + /// Increment compressed length counter by given size, returns previous value > + #[inline(always)] > + pub(crate) fn inc_compressed_stream_len(&mut self, size: u64) -> u64 { > + self.compressed_stream_len.fetch_add(size, Ordering::SeqCst) > + } > + > + /// Increment stream length counter by given size, returns previous value > + #[inline(always)] > + pub(crate) fn inc_injected_stream_len(&mut self, size: usize) -> usize { > + self.injected_stream_len.fetch_add(size, Ordering::SeqCst) > + } > + > + /// Return a Arc clone to the total stream length counter > + #[inline(always)] > + pub(crate) fn total_stream_len_counter(&self) -> Arc { > + self.total_stream_len.clone() > + } > + > + /// Convert the counters to [`UploadStats`], including given archive checksum and runtime. > + #[inline(always)] > + pub(crate) fn to_upload_stats(&self, csum: [u8; 32], duration: Duration) -> UploadStats { > + UploadStats { > + chunk_count: self.total_chunk_count.load(Ordering::SeqCst), > + chunk_reused: self.known_chunk_count.load(Ordering::SeqCst), > + chunk_injected: self.injected_chunk_count.load(Ordering::SeqCst), > + size: self.total_stream_len.load(Ordering::SeqCst), > + size_reused: self.reused_stream_len.load(Ordering::SeqCst), > + size_injected: self.injected_stream_len.load(Ordering::SeqCst), > + size_compressed: self.compressed_stream_len.load(Ordering::SeqCst) as usize, > + duration, > + csum, > + } > + } > +} > diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs > index 4d2e8a801..f08a65153 100644 > --- a/pbs-client/src/backup_writer.rs > +++ b/pbs-client/src/backup_writer.rs > @@ -1,7 +1,8 @@ > use std::collections::HashSet; > use std::future::Future; > -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > +use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > +use std::time::Instant; > > use anyhow::{bail, format_err, Error}; > use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; > @@ -23,6 +24,7 @@ use pbs_tools::crypt_config::CryptConfig; > use proxmox_human_byte::HumanByte; > use proxmox_time::TimeSpan; > > +use super::backup_stats::{BackupStats, UploadCounters, UploadStats}; > use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; > use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; > > @@ -40,11 +42,6 @@ impl Drop for BackupWriter { > } > } > > -pub struct BackupStats { > - pub size: u64, > - pub csum: [u8; 32], > -} > - > /// Options for uploading blobs/streams to the server > #[derive(Default, Clone)] > pub struct UploadOptions { > @@ -54,18 +51,6 @@ pub struct UploadOptions { > pub fixed_size: Option, > } > > -struct UploadStats { > - chunk_count: usize, > - chunk_reused: usize, > - chunk_injected: usize, > - size: usize, > - size_reused: usize, > - size_injected: usize, > - size_compressed: usize, > - duration: std::time::Duration, > - csum: [u8; 32], > -} > - > struct ChunkUploadResponse { > future: h2::client::ResponseFuture, > size: usize, > @@ -194,6 +179,7 @@ impl BackupWriter { > mut reader: R, > file_name: &str, > ) -> Result { > + let start_time = Instant::now(); > let mut raw_data = Vec::new(); > // fixme: avoid loading into memory > reader.read_to_end(&mut raw_data)?; > @@ -211,7 +197,12 @@ impl BackupWriter { > raw_data, > ) > .await?; > - Ok(BackupStats { size, csum }) > + Ok(BackupStats { > + size, > + csum, > + duration: start_time.elapsed(), > + chunk_count: 0, > + }) > } > > pub async fn upload_blob_from_data( > @@ -220,6 +211,7 @@ impl BackupWriter { > file_name: &str, > options: UploadOptions, > ) -> Result { > + let start_time = Instant::now(); > let blob = match (options.encrypt, &self.crypt_config) { > (false, _) => DataBlob::encode(&data, None, options.compress)?, > (true, None) => bail!("requested encryption without a crypt config"), > @@ -243,7 +235,12 @@ impl BackupWriter { > raw_data, > ) > .await?; > - Ok(BackupStats { size, csum }) > + Ok(BackupStats { > + size, > + csum, > + duration: start_time.elapsed(), > + chunk_count: 0, > + }) > } > > pub async fn upload_blob_from_file>( > @@ -421,10 +418,7 @@ impl BackupWriter { > "csum": hex::encode(upload_stats.csum), > }); > let _value = self.h2.post(&close_path, Some(param)).await?; > - Ok(BackupStats { > - size: upload_stats.size as u64, > - csum: upload_stats.csum, > - }) > + Ok(upload_stats.to_backup_stats()) > } > > fn response_queue() -> ( > @@ -653,23 +647,10 @@ impl BackupWriter { > injections: Option>, > archive: &str, > ) -> impl Future> { > - let total_chunks = Arc::new(AtomicUsize::new(0)); > - let total_chunks2 = total_chunks.clone(); > - let known_chunk_count = Arc::new(AtomicUsize::new(0)); > - let known_chunk_count2 = known_chunk_count.clone(); > - let injected_chunk_count = Arc::new(AtomicUsize::new(0)); > - let injected_chunk_count2 = injected_chunk_count.clone(); > - > - let stream_len = Arc::new(AtomicUsize::new(0)); > - let stream_len2 = stream_len.clone(); > - let stream_len3 = stream_len.clone(); > - let compressed_stream_len = Arc::new(AtomicU64::new(0)); > - let compressed_stream_len2 = compressed_stream_len.clone(); > - let reused_len = Arc::new(AtomicUsize::new(0)); > - let reused_len2 = reused_len.clone(); > - let injected_len = Arc::new(AtomicUsize::new(0)); > - let injected_len2 = injected_len.clone(); > - let uploaded_len = Arc::new(AtomicUsize::new(0)); > + let mut counters = UploadCounters::new(); > + let total_stream_len = counters.total_stream_len_counter(); > + let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0)); > + let counters_readonly = counters.clone(); > > let append_chunk_path = format!("{}_index", prefix); > let upload_chunk_path = format!("{}_chunk", prefix); > @@ -691,7 +672,7 @@ impl BackupWriter { > loop { > tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; > > - let size = HumanByte::from(stream_len3.load(Ordering::SeqCst)); > + let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst)); > let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst)); > let elapsed = TimeSpan::from(start_time.elapsed()); > > @@ -703,22 +684,21 @@ impl BackupWriter { > }; > > stream > - .inject_reused_chunks(injections, stream_len.clone()) > + .inject_reused_chunks(injections, counters.total_stream_len_counter()) > .and_then(move |chunk_info| match chunk_info { > InjectedChunksInfo::Known(chunks) => { > // account for injected chunks > let count = chunks.len(); > - total_chunks.fetch_add(count, Ordering::SeqCst); > - injected_chunk_count.fetch_add(count, Ordering::SeqCst); > + counters.inc_total_chunks(count); > + counters.inc_injected_chunks(count); > > let mut known = Vec::new(); > let mut guard = index_csum.lock().unwrap(); > let csum = guard.as_mut().unwrap(); > for chunk in chunks { > - let offset = > - stream_len.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64; > - reused_len.fetch_add(chunk.size() as usize, Ordering::SeqCst); > - injected_len.fetch_add(chunk.size() as usize, Ordering::SeqCst); > + let offset = counters.inc_total_stream_len(chunk.size() as usize) as u64; > + counters.inc_reused_stream_len(chunk.size() as usize); > + counters.inc_injected_stream_len(chunk.size() as usize); > let digest = chunk.digest(); > known.push((offset, digest)); > let end_offset = offset + chunk.size(); > @@ -731,8 +711,8 @@ impl BackupWriter { > // account for not injected chunks (new and known) > let chunk_len = data.len(); > > - total_chunks.fetch_add(1, Ordering::SeqCst); > - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; > + counters.inc_total_chunks(1); > + let offset = counters.inc_total_stream_len(chunk_len) as u64; > > let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); > > @@ -755,14 +735,14 @@ impl BackupWriter { > > let chunk_is_known = known_chunks.contains(digest); > if chunk_is_known { > - known_chunk_count.fetch_add(1, Ordering::SeqCst); > - reused_len.fetch_add(chunk_len, Ordering::SeqCst); > + counters.inc_known_chunks(1); > + counters.inc_reused_stream_len(chunk_len); > future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) > } else { > - let compressed_stream_len2 = compressed_stream_len.clone(); > + let mut counters = counters.clone(); > known_chunks.insert(*digest); > future::ready(chunk_builder.build().map(move |(chunk, digest)| { > - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); > + counters.inc_compressed_stream_len(chunk.raw_size()); > MergedChunkInfo::New(ChunkInfo { > chunk, > digest, > @@ -837,15 +817,6 @@ impl BackupWriter { > }) > .then(move |result| async move { upload_result.await?.and(result) }.boxed()) > .and_then(move |_| { > - let duration = start_time.elapsed(); > - let chunk_count = total_chunks2.load(Ordering::SeqCst); > - let chunk_reused = known_chunk_count2.load(Ordering::SeqCst); > - let chunk_injected = injected_chunk_count2.load(Ordering::SeqCst); > - let size = stream_len2.load(Ordering::SeqCst); > - let size_reused = reused_len2.load(Ordering::SeqCst); > - let size_injected = injected_len2.load(Ordering::SeqCst); > - let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize; > - > let mut guard = index_csum_2.lock().unwrap(); > let csum = guard.take().unwrap().finish(); > > @@ -853,17 +824,7 @@ impl BackupWriter { > handle.abort(); > } > > - futures::future::ok(UploadStats { > - chunk_count, > - chunk_reused, > - chunk_injected, > - size, > - size_reused, > - size_injected, > - size_compressed, > - duration, > - csum, > - }) > + futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed())) > }) > } > > diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs > index 3d2da27b9..b875347bb 100644 > --- a/pbs-client/src/lib.rs > +++ b/pbs-client/src/lib.rs > @@ -41,4 +41,7 @@ pub use backup_specification::*; > mod chunk_stream; > pub use chunk_stream::{ChunkStream, FixedChunkStream, InjectionData}; > > +mod backup_stats; > +pub use backup_stats::BackupStats; > + > pub const PROXMOX_BACKUP_TCP_KEEPALIVE_TIME: u32 = 120; > -- > 2.39.5 > > > > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel > > > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel