From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id DA2381FF16B for ; Fri, 7 Nov 2025 09:39:04 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 021B19E99; Fri, 7 Nov 2025 09:39:47 +0100 (CET) Message-ID: <49ae4fa9-fd40-45ca-a058-4fd1962871af@proxmox.com> Date: Fri, 7 Nov 2025 09:39:11 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird To: Proxmox Backup Server development discussion , Nicolas Frey References: <20251106161316.528349-1-n.frey@proxmox.com> <20251106161316.528349-5-n.frey@proxmox.com> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <20251106161316.528349-5-n.frey@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762504732019 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v2 1/4] api: verify: move chunk loading into parallel handler X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Bundling the parameters makes this easier to digest, thanks. Nevertheless, which I failed to mention on the previous version of the patch, this should better be done as preparatory patch, so the introduction of the new `IndexVerifyState` and all the adaption required just for that should go into one, the actual changes to the chunk loading in a subsequent patch. That makes it easier to diget. some more comments inline On 11/6/25 5:13 PM, Nicolas Frey wrote: > This way, the chunks will be loaded in parallel in addition to being > checked in parallel. > > Depending on the underlying storage, this can speed up reading chunks > from disk, especially when the underlying storage is IO depth > dependent, and the CPU is faster than the storage. > > Introduces a new state struct `IndexVerifyState` so that we only need > to pass around and clone one `Arc`. > > Originally-by: Dominik Csapak > Signed-off-by: Nicolas Frey > --- > src/backup/verify.rs | 134 +++++++++++++++++++++++++------------------ > 1 file changed, 78 insertions(+), 56 deletions(-) > > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index 31c03891..910a3ed5 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -1,6 +1,6 @@ > use pbs_config::BackupLockGuard; > use std::collections::HashSet; > -use std::sync::atomic::{AtomicUsize, Ordering}; > +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::Instant; > > @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile}; > use pbs_datastore::manifest::{BackupManifest, FileInfo}; > use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress}; > > -use crate::tools::parallel_handler::ParallelHandler; > +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; > > use crate::backup::hierarchy::ListAccessibleBackupGroups; > > @@ -34,6 +34,15 @@ pub struct VerifyWorker { > backend: DatastoreBackend, > } > > +struct IndexVerifyState { > + read_bytes: AtomicU64, > + decoded_bytes: AtomicU64, > + errors: AtomicUsize, > + datastore: Arc, > + corrupt_chunks: Arc>>, > + verified_chunks: Arc>>, > +} > + > impl VerifyWorker { > /// Creates a new VerifyWorker for a given task worker and datastore. > pub fn new( > @@ -81,27 +90,25 @@ impl VerifyWorker { > index: Box, > crypt_mode: CryptMode, > ) -> Result<(), Error> { > - let errors = Arc::new(AtomicUsize::new(0)); > - > let start_time = Instant::now(); nit: The start_time could be moved to the `IndexVerifyState` as well > > - let mut read_bytes = 0; > - let mut decoded_bytes = 0; > - > - let datastore2 = Arc::clone(&self.datastore); > - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); > - let verified_chunks2 = Arc::clone(&self.verified_chunks); > - let errors2 = Arc::clone(&errors); > - > - let decoder_pool = ParallelHandler::new( > - "verify chunk decoder", > - 4, > + let verify_state = Arc::new(IndexVerifyState { > + read_bytes: AtomicU64::new(0), > + decoded_bytes: AtomicU64::new(0), > + errors: AtomicUsize::new(0), > + datastore: Arc::clone(&self.datastore), > + corrupt_chunks: Arc::clone(&self.corrupt_chunks), > + verified_chunks: Arc::clone(&self.verified_chunks), > + }); This could be moved to a constructor and instantiated via `IndexVerifyState::new(&self.datastore, &self.corrupt_chunks, &self.verified_chunks)`, which sets all the field values and gathers the start time as well. > + > + let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { > + let verify_state = Arc::clone(&verify_state); > move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { > let chunk_crypt_mode = match chunk.crypt_mode() { > Err(err) => { > - corrupt_chunks2.lock().unwrap().insert(digest); > + verify_state.corrupt_chunks.lock().unwrap().insert(digest); > info!("can't verify chunk, unknown CryptMode - {err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > return Ok(()); > } > Ok(mode) => mode, > @@ -111,25 +118,25 @@ impl VerifyWorker { > info!( > "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" > ); > - errors2.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > } > > if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { > - corrupt_chunks2.lock().unwrap().insert(digest); > + verify_state.corrupt_chunks.lock().unwrap().insert(digest); > info!("{err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > - match datastore2.rename_corrupt_chunk(&digest) { > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > + match verify_state.datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), > } > } else { > - verified_chunks2.lock().unwrap().insert(digest); > + verify_state.verified_chunks.lock().unwrap().insert(digest); > } > > Ok(()) > - }, > - ); > + } > + }); > > let skip_chunk = |digest: &[u8; 32]| -> bool { > if self.verified_chunks.lock().unwrap().contains(digest) { > @@ -137,7 +144,7 @@ impl VerifyWorker { > } else if self.corrupt_chunks.lock().unwrap().contains(digest) { > let digest_str = hex::encode(digest); > info!("chunk {digest_str} was marked as corrupt"); > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > true > } else { > false > @@ -156,6 +163,21 @@ impl VerifyWorker { > .datastore > .get_chunks_in_order(&*index, skip_chunk, check_abort)?; > > + let reader_pool = ParallelHandler::new("read chunks", 4, { > + let decoder_pool = decoder_pool.channel(); > + let verify_state = Arc::clone(&verify_state); > + let backend = self.backend.clone(); > + > + move |info: ChunkReadInfo| { > + Self::verify_chunk_by_backend( > + &backend, > + Arc::clone(&verify_state), > + &decoder_pool, > + &info, > + ) > + } > + }); > + > for (pos, _) in chunk_list { > self.worker.check_abort()?; > self.worker.fail_on_shutdown()?; > @@ -167,58 +189,56 @@ impl VerifyWorker { > continue; // already verified or marked corrupt > } > > - self.verify_chunk_by_backend( > - &info, > - &mut read_bytes, > - &mut decoded_bytes, > - Arc::clone(&errors), > - &decoder_pool, > - )?; > + reader_pool.send(info)?; > } > > - decoder_pool.complete()?; > + reader_pool.complete()?; > > let elapsed = start_time.elapsed().as_secs_f64(); could get the time from the `verify_state` then > > + let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst); > + let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst); > + > let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); > let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); > > let read_speed = read_bytes_mib / elapsed; > let decode_speed = decoded_bytes_mib / elapsed; > > - let error_count = errors.load(Ordering::SeqCst); > + let error_count = verify_state.errors.load(Ordering::SeqCst); > > info!( > " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)" > ); > > - if errors.load(Ordering::SeqCst) > 0 { > + if verify_state.errors.load(Ordering::SeqCst) > 0 { > bail!("chunks could not be verified"); > } > > Ok(()) > } > > + #[allow(clippy::too_many_arguments)] nit: above is no longer needed? > fn verify_chunk_by_backend( > - &self, > + backend: &DatastoreBackend, > + verify_state: Arc, > + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, > info: &ChunkReadInfo, > - read_bytes: &mut u64, > - decoded_bytes: &mut u64, > - errors: Arc, > - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, > ) -> Result<(), Error> { > - match &self.backend { > - DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) { > - Err(err) => self.add_corrupt_chunk( > + match backend { > + DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) { > + Err(err) => Self::add_corrupt_chunk( > + verify_state, > info.digest, > - errors, > &format!("can't verify chunk, load failed - {err}"), > ), > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + verify_state > + .read_bytes > + .fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > }, > DatastoreBackend::S3(s3_client) => { > @@ -235,26 +255,28 @@ impl VerifyWorker { > match chunk_result { > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + verify_state > + .read_bytes > + .fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > error!("can't verify chunk, load failed - {err}"); > } > } > } > - Ok(None) => self.add_corrupt_chunk( > + Ok(None) => Self::add_corrupt_chunk( > + verify_state, > info.digest, > - errors, > &format!( > "can't verify missing chunk with digest {}", > hex::encode(info.digest) > ), > ), > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > error!("can't verify chunk, load failed - {err}"); > } > } > @@ -263,13 +285,13 @@ impl VerifyWorker { > Ok(()) > } > > - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc, message: &str) { > + fn add_corrupt_chunk(verify_state: Arc, digest: [u8; 32], message: &str) { > // Panic on poisoned mutex > - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); > + let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap(); > corrupt_chunks.insert(digest); > error!(message); > - errors.fetch_add(1, Ordering::SeqCst); > - match self.datastore.rename_corrupt_chunk(&digest) { > + verify_state.errors.fetch_add(1, Ordering::SeqCst); > + match verify_state.datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel