From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 89B771FF15E for ; Mon, 10 Nov 2025 09:43:44 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C2294F749; Mon, 10 Nov 2025 09:44:29 +0100 (CET) From: Nicolas Frey To: pbs-devel@lists.proxmox.com Date: Mon, 10 Nov 2025 09:44:13 +0100 Message-ID: <20251110084417.173290-6-n.frey@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251110084417.173290-1-n.frey@proxmox.com> References: <20251110084417.173290-1-n.frey@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.099 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_LAZY_DOMAIN_SECURITY 1 Sending domain does not have any anti-forgery methods RDNS_NONE 0.793 Delivered to internal network by a host with no rDNS SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_NONE 0.001 SPF: sender does not publish an SPF Record Subject: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Introduces a new state struct `IndexVerifyState` so that we only need to pass around and clone one `Arc`. Suggested-by: Christian Ebner Signed-off-by: Nicolas Frey --- src/backup/verify.rs | 130 ++++++++++++++++++++++--------------------- 1 file changed, 67 insertions(+), 63 deletions(-) diff --git a/src/backup/verify.rs b/src/backup/verify.rs index c0ff15d4..9a20c8e1 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -34,6 +34,34 @@ pub struct VerifyWorker { backend: DatastoreBackend, } +struct IndexVerifyState { + read_bytes: AtomicU64, + decoded_bytes: AtomicU64, + errors: AtomicUsize, + datastore: Arc, + corrupt_chunks: Arc>>, + verified_chunks: Arc>>, + start_time: Instant, +} + +impl IndexVerifyState { + fn new( + datastore: &Arc, + corrupt_chunks: &Arc>>, + verified_chunks: &Arc>>, + ) -> Self { + Self { + read_bytes: AtomicU64::new(0), + decoded_bytes: AtomicU64::new(0), + errors: AtomicUsize::new(0), + datastore: Arc::clone(datastore), + corrupt_chunks: Arc::clone(corrupt_chunks), + verified_chunks: Arc::clone(verified_chunks), + start_time: Instant::now(), + } + } +} + impl VerifyWorker { /// Creates a new VerifyWorker for a given task worker and datastore. pub fn new( @@ -81,24 +109,20 @@ impl VerifyWorker { index: Box, crypt_mode: CryptMode, ) -> Result<(), Error> { - let errors = Arc::new(AtomicUsize::new(0)); - - let start_time = Instant::now(); - - let read_bytes = Arc::new(AtomicU64::new(0)); - let decoded_bytes = Arc::new(AtomicU64::new(0)); + let verify_state = Arc::new(IndexVerifyState::new( + &self.datastore, + &self.corrupt_chunks, + &self.verified_chunks, + )); let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { - let datastore = Arc::clone(&self.datastore); - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); - let verified_chunks = Arc::clone(&self.verified_chunks); - let errors = Arc::clone(&errors); + let verify_state = Arc::clone(&verify_state); move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { - corrupt_chunks.lock().unwrap().insert(digest); + verify_state.corrupt_chunks.lock().unwrap().insert(digest); info!("can't verify chunk, unknown CryptMode - {err}"); - errors.fetch_add(1, Ordering::SeqCst); + verify_state.errors.fetch_add(1, Ordering::SeqCst); return Ok(()); } Ok(mode) => mode, @@ -108,20 +132,20 @@ impl VerifyWorker { info!( "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" ); - errors.fetch_add(1, Ordering::SeqCst); + verify_state.errors.fetch_add(1, Ordering::SeqCst); } if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { - corrupt_chunks.lock().unwrap().insert(digest); + verify_state.corrupt_chunks.lock().unwrap().insert(digest); info!("{err}"); - errors.fetch_add(1, Ordering::SeqCst); - match datastore.rename_corrupt_chunk(&digest) { + verify_state.errors.fetch_add(1, Ordering::SeqCst); + match verify_state.datastore.rename_corrupt_chunk(&digest) { Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), Err(err) => info!("{err}"), _ => (), } } else { - verified_chunks.lock().unwrap().insert(digest); + verify_state.verified_chunks.lock().unwrap().insert(digest); } Ok(()) @@ -134,7 +158,7 @@ impl VerifyWorker { } else if self.corrupt_chunks.lock().unwrap().contains(digest) { let digest_str = hex::encode(digest); info!("chunk {digest_str} was marked as corrupt"); - errors.fetch_add(1, Ordering::SeqCst); + verify_state.errors.fetch_add(1, Ordering::SeqCst); true } else { false @@ -155,27 +179,18 @@ impl VerifyWorker { let reader_pool = ParallelHandler::new("read chunks", 4, { let decoder_pool = decoder_pool.channel(); - let datastore = Arc::clone(&self.datastore); - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); - let read_bytes = Arc::clone(&read_bytes); - let decoded_bytes = Arc::clone(&decoded_bytes); - let errors = Arc::clone(&errors); + let verify_state = Arc::clone(&verify_state); let backend = self.backend.clone(); move |info: ChunkReadInfo| { Self::verify_chunk_by_backend( &backend, - Arc::clone(&datastore), - Arc::clone(&corrupt_chunks), - Arc::clone(&read_bytes), - Arc::clone(&decoded_bytes), - Arc::clone(&errors), + Arc::clone(&verify_state), &decoder_pool, &info, ) } }); - for (pos, _) in chunk_list { self.worker.check_abort()?; self.worker.fail_on_shutdown()?; @@ -192,10 +207,10 @@ impl VerifyWorker { reader_pool.complete()?; - let elapsed = start_time.elapsed().as_secs_f64(); + let elapsed = verify_state.start_time.elapsed().as_secs_f64(); - let read_bytes = read_bytes.load(Ordering::SeqCst); - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); + let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst); + let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst); let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); @@ -203,44 +218,39 @@ impl VerifyWorker { let read_speed = read_bytes_mib / elapsed; let decode_speed = decoded_bytes_mib / elapsed; - let error_count = errors.load(Ordering::SeqCst); + let error_count = verify_state.errors.load(Ordering::SeqCst); info!( " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)" ); - if errors.load(Ordering::SeqCst) > 0 { + if verify_state.errors.load(Ordering::SeqCst) > 0 { bail!("chunks could not be verified"); } Ok(()) } - #[allow(clippy::too_many_arguments)] fn verify_chunk_by_backend( backend: &DatastoreBackend, - datastore: Arc, - corrupt_chunks: Arc>>, - read_bytes: Arc, - decoded_bytes: Arc, - errors: Arc, + verify_state: Arc, decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, info: &ChunkReadInfo, ) -> Result<(), Error> { match backend { - DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { + DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) { Err(err) => Self::add_corrupt_chunk( - datastore, - corrupt_chunks, + verify_state, info.digest, - errors, &format!("can't verify chunk, load failed - {err}"), ), Ok(chunk) => { let size = info.size(); - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); + verify_state + .read_bytes + .fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - decoded_bytes.fetch_add(size, Ordering::SeqCst); + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); } }, DatastoreBackend::S3(s3_client) => { @@ -257,28 +267,28 @@ impl VerifyWorker { match chunk_result { Ok(chunk) => { let size = info.size(); - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); + verify_state + .read_bytes + .fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - decoded_bytes.fetch_add(size, Ordering::SeqCst); + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); } Err(err) => { - errors.fetch_add(1, Ordering::SeqCst); + verify_state.errors.fetch_add(1, Ordering::SeqCst); error!("can't verify chunk, load failed - {err}"); } } } Ok(None) => Self::add_corrupt_chunk( - datastore, - corrupt_chunks, + verify_state, info.digest, - errors, &format!( "can't verify missing chunk with digest {}", hex::encode(info.digest) ), ), Err(err) => { - errors.fetch_add(1, Ordering::SeqCst); + verify_state.errors.fetch_add(1, Ordering::SeqCst); error!("can't verify chunk, load failed - {err}"); } } @@ -287,19 +297,13 @@ impl VerifyWorker { Ok(()) } - fn add_corrupt_chunk( - datastore: Arc, - corrupt_chunks: Arc>>, - digest: [u8; 32], - errors: Arc, - message: &str, - ) { + fn add_corrupt_chunk(verify_state: Arc, digest: [u8; 32], message: &str) { // Panic on poisoned mutex - let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); + let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap(); corrupt_chunks.insert(digest); error!(message); - errors.fetch_add(1, Ordering::SeqCst); - match datastore.rename_corrupt_chunk(&digest) { + verify_state.errors.fetch_add(1, Ordering::SeqCst); + match verify_state.datastore.rename_corrupt_chunk(&digest) { Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), Err(err) => info!("{err}"), _ => (), -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel