From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 64DDA1FF17A for ; Tue, 11 Nov 2025 11:21:37 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B096F45B6; Tue, 11 Nov 2025 11:22:23 +0100 (CET) Message-ID: Date: Tue, 11 Nov 2025 11:22:19 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Christian Ebner To: Proxmox Backup Server development discussion , Nicolas Frey References: <20251110084417.173290-1-n.frey@proxmox.com> <20251110084417.173290-6-n.frey@proxmox.com> Content-Language: en-US, de-DE In-Reply-To: <20251110084417.173290-6-n.frey@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762856516892 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" as mentioned in the previous patch, IMO this one should be introduced before that one, adding the parallel handler only on top of that. On 11/10/25 9:44 AM, Nicolas Frey wrote: > Introduces a new state struct `IndexVerifyState` so that we only need > to pass around and clone one `Arc`. > > Suggested-by: Christian Ebner > Signed-off-by: Nicolas Frey > --- > src/backup/verify.rs | 130 ++++++++++++++++++++++--------------------- > 1 file changed, 67 insertions(+), 63 deletions(-) > > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index c0ff15d4..9a20c8e1 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -34,6 +34,34 @@ pub struct VerifyWorker { > backend: DatastoreBackend, > } > > +struct IndexVerifyState { > + read_bytes: AtomicU64, > + decoded_bytes: AtomicU64, > + errors: AtomicUsize, > + datastore: Arc, > + corrupt_chunks: Arc>>, > + verified_chunks: Arc>>, > + start_time: Instant, > +} > + > +impl IndexVerifyState { > + fn new( > + datastore: &Arc, > + corrupt_chunks: &Arc>>, > + verified_chunks: &Arc>>, > + ) -> Self { > + Self { > + read_bytes: AtomicU64::new(0), > + decoded_bytes: AtomicU64::new(0), > + errors: AtomicUsize::new(0), > + datastore: Arc::clone(datastore), > + corrupt_chunks: Arc::clone(corrupt_chunks), > + verified_chunks: Arc::clone(verified_chunks), > + start_time: Instant::now(), > + } > + } > +} > + > impl VerifyWorker { > /// Creates a new VerifyWorker for a given task worker and datastore. > pub fn new( > @@ -81,24 +109,20 @@ impl VerifyWorker { > index: Box, > crypt_mode: CryptMode, > ) -> Result<(), Error> { > - let errors = Arc::new(AtomicUsize::new(0)); > - > - let start_time = Instant::now(); > - > - let read_bytes = Arc::new(AtomicU64::new(0)); > - let decoded_bytes = Arc::new(AtomicU64::new(0)); > + let verify_state = Arc::new(IndexVerifyState::new( > + &self.datastore, > + &self.corrupt_chunks, > + &self.verified_chunks, > + )); > > let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { > - let datastore = Arc::clone(&self.datastore); > - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > - let verified_chunks = Arc::clone(&self.verified_chunks); > - let errors = Arc::clone(&errors); > + let verify_state = Arc::clone(&verify_state); > move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { > let chunk_crypt_mode = match chunk.crypt_mode() { > Err(err) => { > - corrupt_chunks.lock().unwrap().insert(digest); > + verify_state.corrupt_chunks.lock().unwrap().insert(digest); > info!("can't verify chunk, unknown CryptMode - {err}"); > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > return Ok(()); > } > Ok(mode) => mode, > @@ -108,20 +132,20 @@ impl VerifyWorker { > info!( > "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" > ); > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > } > > if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { > - corrupt_chunks.lock().unwrap().insert(digest); > + verify_state.corrupt_chunks.lock().unwrap().insert(digest); > info!("{err}"); > - errors.fetch_add(1, Ordering::SeqCst); > - match datastore.rename_corrupt_chunk(&digest) { > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > + match verify_state.datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), > } > } else { > - verified_chunks.lock().unwrap().insert(digest); > + verify_state.verified_chunks.lock().unwrap().insert(digest); > } > > Ok(()) > @@ -134,7 +158,7 @@ impl VerifyWorker { > } else if self.corrupt_chunks.lock().unwrap().contains(digest) { > let digest_str = hex::encode(digest); > info!("chunk {digest_str} was marked as corrupt"); > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > true > } else { > false > @@ -155,27 +179,18 @@ impl VerifyWorker { > > let reader_pool = ParallelHandler::new("read chunks", 4, { > let decoder_pool = decoder_pool.channel(); > - let datastore = Arc::clone(&self.datastore); > - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > - let read_bytes = Arc::clone(&read_bytes); > - let decoded_bytes = Arc::clone(&decoded_bytes); > - let errors = Arc::clone(&errors); > + let verify_state = Arc::clone(&verify_state); > let backend = self.backend.clone(); > > move |info: ChunkReadInfo| { > Self::verify_chunk_by_backend( > &backend, > - Arc::clone(&datastore), > - Arc::clone(&corrupt_chunks), > - Arc::clone(&read_bytes), > - Arc::clone(&decoded_bytes), > - Arc::clone(&errors), > + Arc::clone(&verify_state), > &decoder_pool, > &info, > ) > } > }); > - > for (pos, _) in chunk_list { > self.worker.check_abort()?; > self.worker.fail_on_shutdown()?; > @@ -192,10 +207,10 @@ impl VerifyWorker { > > reader_pool.complete()?; > > - let elapsed = start_time.elapsed().as_secs_f64(); > + let elapsed = verify_state.start_time.elapsed().as_secs_f64(); > > - let read_bytes = read_bytes.load(Ordering::SeqCst); > - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); > + let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst); > + let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst); > > let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); > let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); > @@ -203,44 +218,39 @@ impl VerifyWorker { > let read_speed = read_bytes_mib / elapsed; > let decode_speed = decoded_bytes_mib / elapsed; > > - let error_count = errors.load(Ordering::SeqCst); > + let error_count = verify_state.errors.load(Ordering::SeqCst); > > info!( > " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)" > ); > > - if errors.load(Ordering::SeqCst) > 0 { > + if verify_state.errors.load(Ordering::SeqCst) > 0 { > bail!("chunks could not be verified"); > } > > Ok(()) > } > > - #[allow(clippy::too_many_arguments)] > fn verify_chunk_by_backend( > backend: &DatastoreBackend, > - datastore: Arc, > - corrupt_chunks: Arc>>, > - read_bytes: Arc, > - decoded_bytes: Arc, > - errors: Arc, > + verify_state: Arc, > decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, > info: &ChunkReadInfo, > ) -> Result<(), Error> { > match backend { > - DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { > + DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) { > Err(err) => Self::add_corrupt_chunk( > - datastore, > - corrupt_chunks, > + verify_state, > info.digest, > - errors, > &format!("can't verify chunk, load failed - {err}"), > ), > Ok(chunk) => { > let size = info.size(); > - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > + verify_state > + .read_bytes > + .fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - decoded_bytes.fetch_add(size, Ordering::SeqCst); > + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > }, > DatastoreBackend::S3(s3_client) => { > @@ -257,28 +267,28 @@ impl VerifyWorker { > match chunk_result { > Ok(chunk) => { > let size = info.size(); > - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > + verify_state > + .read_bytes > + .fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - decoded_bytes.fetch_add(size, Ordering::SeqCst); > + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > error!("can't verify chunk, load failed - {err}"); > } > } > } > Ok(None) => Self::add_corrupt_chunk( > - datastore, > - corrupt_chunks, > + verify_state, > info.digest, > - errors, > &format!( > "can't verify missing chunk with digest {}", > hex::encode(info.digest) > ), > ), > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > error!("can't verify chunk, load failed - {err}"); > } > } > @@ -287,19 +297,13 @@ impl VerifyWorker { > Ok(()) > } > > - fn add_corrupt_chunk( > - datastore: Arc, > - corrupt_chunks: Arc>>, > - digest: [u8; 32], > - errors: Arc, > - message: &str, > - ) { > + fn add_corrupt_chunk(verify_state: Arc, digest: [u8; 32], message: &str) { > // Panic on poisoned mutex > - let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); > + let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap(); > corrupt_chunks.insert(digest); > error!(message); > - errors.fetch_add(1, Ordering::SeqCst); > - match datastore.rename_corrupt_chunk(&digest) { > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > + match verify_state.datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel