From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id DB6AC1FF17A for ; Tue, 11 Nov 2025 11:21:33 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id AB8634563; Tue, 11 Nov 2025 11:22:19 +0100 (CET) Message-ID: Date: Tue, 11 Nov 2025 11:22:15 +0100 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Christian Ebner To: Proxmox Backup Server development discussion , Nicolas Frey References: <20251110084417.173290-1-n.frey@proxmox.com> <20251110084417.173290-5-n.frey@proxmox.com> Content-Language: en-US, de-DE In-Reply-To: <20251110084417.173290-5-n.frey@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762856512206 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On 11/10/25 9:44 AM, Nicolas Frey wrote: > This way, the chunks will be loaded in parallel in addition to being > checked in parallel. > > Depending on the underlying storage, this can speed up reading chunks > from disk, especially when the underlying storage is IO depth > dependent, and the CPU is faster than the storage. > > Originally-by: Dominik Csapak > Signed-off-by: Nicolas Frey > --- I think this patch diff could be further reduced and split by: - placing the following patch introducing the struct first - do the refactoring from member method to associated function for verify_chunk_by_backend() and add_corrupt_chunk() - only then introduce the parallel chunk reading on top of that Or was your intention to stay close to the original patch here? One further comment inline. > src/backup/verify.rs | 120 +++++++++++++++++++++++++++---------------- > 1 file changed, 75 insertions(+), 45 deletions(-) > > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index 31c03891..c0ff15d4 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -1,6 +1,6 @@ > use pbs_config::BackupLockGuard; > use std::collections::HashSet; > -use std::sync::atomic::{AtomicUsize, Ordering}; > +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::Instant; > > @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile}; > use pbs_datastore::manifest::{BackupManifest, FileInfo}; > use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress}; > > -use crate::tools::parallel_handler::ParallelHandler; > +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; > > use crate::backup::hierarchy::ListAccessibleBackupGroups; > > @@ -85,23 +85,20 @@ impl VerifyWorker { > > let start_time = Instant::now(); > > - let mut read_bytes = 0; > - let mut decoded_bytes = 0; > + let read_bytes = Arc::new(AtomicU64::new(0)); > + let decoded_bytes = Arc::new(AtomicU64::new(0)); > > - let datastore2 = Arc::clone(&self.datastore); > - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); > - let verified_chunks2 = Arc::clone(&self.verified_chunks); > - let errors2 = Arc::clone(&errors); > - > - let decoder_pool = ParallelHandler::new( > - "verify chunk decoder", > - 4, > + let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { > + let datastore = Arc::clone(&self.datastore); > + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > + let verified_chunks = Arc::clone(&self.verified_chunks); > + let errors = Arc::clone(&errors); > move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { > let chunk_crypt_mode = match chunk.crypt_mode() { > Err(err) => { > - corrupt_chunks2.lock().unwrap().insert(digest); > + corrupt_chunks.lock().unwrap().insert(digest); > info!("can't verify chunk, unknown CryptMode - {err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > + errors.fetch_add(1, Ordering::SeqCst); > return Ok(()); > } > Ok(mode) => mode, > @@ -111,25 +108,25 @@ impl VerifyWorker { > info!( > "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" > ); > - errors2.fetch_add(1, Ordering::SeqCst); > + errors.fetch_add(1, Ordering::SeqCst); > } > > if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { > - corrupt_chunks2.lock().unwrap().insert(digest); > + corrupt_chunks.lock().unwrap().insert(digest); > info!("{err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > - match datastore2.rename_corrupt_chunk(&digest) { > + errors.fetch_add(1, Ordering::SeqCst); > + match datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), > } > } else { > - verified_chunks2.lock().unwrap().insert(digest); > + verified_chunks.lock().unwrap().insert(digest); > } > > Ok(()) > - }, > - ); > + } > + }); > > let skip_chunk = |digest: &[u8; 32]| -> bool { > if self.verified_chunks.lock().unwrap().contains(digest) { > @@ -156,6 +153,29 @@ impl VerifyWorker { > .datastore > .get_chunks_in_order(&*index, skip_chunk, check_abort)?; > > + let reader_pool = ParallelHandler::new("read chunks", 4, { nit: This sets the default for the reader threads to 4, in the subsequent patches that is then however changed to the default of 1 as defined in the api schemas, so while not an issue, this should already be set to that default value IMO. > + let decoder_pool = decoder_pool.channel(); > + let datastore = Arc::clone(&self.datastore); > + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > + let read_bytes = Arc::clone(&read_bytes); > + let decoded_bytes = Arc::clone(&decoded_bytes); > + let errors = Arc::clone(&errors); > + let backend = self.backend.clone(); > + > + move |info: ChunkReadInfo| { > + Self::verify_chunk_by_backend( > + &backend, > + Arc::clone(&datastore), > + Arc::clone(&corrupt_chunks), > + Arc::clone(&read_bytes), > + Arc::clone(&decoded_bytes), > + Arc::clone(&errors), > + &decoder_pool, > + &info, > + ) > + } > + }); > + > for (pos, _) in chunk_list { > self.worker.check_abort()?; > self.worker.fail_on_shutdown()?; > @@ -167,19 +187,16 @@ impl VerifyWorker { > continue; // already verified or marked corrupt > } > > - self.verify_chunk_by_backend( > - &info, > - &mut read_bytes, > - &mut decoded_bytes, > - Arc::clone(&errors), > - &decoder_pool, > - )?; > + reader_pool.send(info)?; > } > > - decoder_pool.complete()?; > + reader_pool.complete()?; > > let elapsed = start_time.elapsed().as_secs_f64(); > > + let read_bytes = read_bytes.load(Ordering::SeqCst); > + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); > + > let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); > let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); > > @@ -199,26 +216,31 @@ impl VerifyWorker { > Ok(()) > } > > + #[allow(clippy::too_many_arguments)] > fn verify_chunk_by_backend( > - &self, > - info: &ChunkReadInfo, > - read_bytes: &mut u64, > - decoded_bytes: &mut u64, > + backend: &DatastoreBackend, > + datastore: Arc, > + corrupt_chunks: Arc>>, > + read_bytes: Arc, > + decoded_bytes: Arc, > errors: Arc, > - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, > + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, > + info: &ChunkReadInfo, > ) -> Result<(), Error> { > - match &self.backend { > - DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) { > - Err(err) => self.add_corrupt_chunk( > + match backend { > + DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { > + Err(err) => Self::add_corrupt_chunk( > + datastore, > + corrupt_chunks, > info.digest, > errors, > &format!("can't verify chunk, load failed - {err}"), > ), > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > }, > DatastoreBackend::S3(s3_client) => { > @@ -235,9 +257,9 @@ impl VerifyWorker { > match chunk_result { > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > Err(err) => { > errors.fetch_add(1, Ordering::SeqCst); > @@ -245,7 +267,9 @@ impl VerifyWorker { > } > } > } > - Ok(None) => self.add_corrupt_chunk( > + Ok(None) => Self::add_corrupt_chunk( > + datastore, > + corrupt_chunks, > info.digest, > errors, > &format!( > @@ -263,13 +287,19 @@ impl VerifyWorker { > Ok(()) > } > > - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc, message: &str) { > + fn add_corrupt_chunk( > + datastore: Arc, > + corrupt_chunks: Arc>>, > + digest: [u8; 32], > + errors: Arc, > + message: &str, > + ) { > // Panic on poisoned mutex > - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); > + let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); > corrupt_chunks.insert(digest); > error!(message); > errors.fetch_add(1, Ordering::SeqCst); > - match self.datastore.rename_corrupt_chunk(&digest) { > + match datastore.rename_corrupt_chunk(&digest) { > Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"), > Err(err) => info!("{err}"), > _ => (), _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel