public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>, Nicolas Frey <n.frey@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler
Date: Tue, 11 Nov 2025 11:22:15 +0100	[thread overview]
Message-ID: <a1cac53e-3aaf-4870-b174-fd56bb7dd333@proxmox.com> (raw)
In-Reply-To: <20251110084417.173290-5-n.frey@proxmox.com>

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> This way, the chunks will be loaded in parallel in addition to being
> checked in parallel.
> 
> Depending on the underlying storage, this can speed up reading chunks
> from disk, especially when the underlying storage is IO depth
> dependent, and the CPU is faster than the storage.
> 
> Originally-by: Dominik Csapak <d.csapak@proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---

I think this patch diff could be further reduced and split by:
- placing the following patch introducing the struct first
- do the refactoring from member method to associated function for 
verify_chunk_by_backend() and add_corrupt_chunk()
- only then introduce the parallel chunk reading on top of that

Or was your intention to stay close to the original patch here?

One further comment inline.

>   src/backup/verify.rs | 120 +++++++++++++++++++++++++++----------------
>   1 file changed, 75 insertions(+), 45 deletions(-)
> 
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 31c03891..c0ff15d4 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -1,6 +1,6 @@
>   use pbs_config::BackupLockGuard;
>   use std::collections::HashSet;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
>   use std::sync::{Arc, Mutex};
>   use std::time::Instant;
>   
> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
>   use pbs_datastore::manifest::{BackupManifest, FileInfo};
>   use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
>   
> -use crate::tools::parallel_handler::ParallelHandler;
> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
>   
>   use crate::backup::hierarchy::ListAccessibleBackupGroups;
>   
> @@ -85,23 +85,20 @@ impl VerifyWorker {
>   
>           let start_time = Instant::now();
>   
> -        let mut read_bytes = 0;
> -        let mut decoded_bytes = 0;
> +        let read_bytes = Arc::new(AtomicU64::new(0));
> +        let decoded_bytes = Arc::new(AtomicU64::new(0));
>   
> -        let datastore2 = Arc::clone(&self.datastore);
> -        let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
> -        let verified_chunks2 = Arc::clone(&self.verified_chunks);
> -        let errors2 = Arc::clone(&errors);
> -
> -        let decoder_pool = ParallelHandler::new(
> -            "verify chunk decoder",
> -            4,
> +        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> +            let datastore = Arc::clone(&self.datastore);
> +            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> +            let verified_chunks = Arc::clone(&self.verified_chunks);
> +            let errors = Arc::clone(&errors);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
>                       Err(err) => {
> -                        corrupt_chunks2.lock().unwrap().insert(digest);
> +                        corrupt_chunks.lock().unwrap().insert(digest);
>                           info!("can't verify chunk, unknown CryptMode - {err}");
> -                        errors2.fetch_add(1, Ordering::SeqCst);
> +                        errors.fetch_add(1, Ordering::SeqCst);
>                           return Ok(());
>                       }
>                       Ok(mode) => mode,
> @@ -111,25 +108,25 @@ impl VerifyWorker {
>                       info!(
>                       "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
>                   );
> -                    errors2.fetch_add(1, Ordering::SeqCst);
> +                    errors.fetch_add(1, Ordering::SeqCst);
>                   }
>   
>                   if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> -                    corrupt_chunks2.lock().unwrap().insert(digest);
> +                    corrupt_chunks.lock().unwrap().insert(digest);
>                       info!("{err}");
> -                    errors2.fetch_add(1, Ordering::SeqCst);
> -                    match datastore2.rename_corrupt_chunk(&digest) {
> +                    errors.fetch_add(1, Ordering::SeqCst);
> +                    match datastore.rename_corrupt_chunk(&digest) {
>                           Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>                           Err(err) => info!("{err}"),
>                           _ => (),
>                       }
>                   } else {
> -                    verified_chunks2.lock().unwrap().insert(digest);
> +                    verified_chunks.lock().unwrap().insert(digest);
>                   }
>   
>                   Ok(())
> -            },
> -        );
> +            }
> +        });
>   
>           let skip_chunk = |digest: &[u8; 32]| -> bool {
>               if self.verified_chunks.lock().unwrap().contains(digest) {
> @@ -156,6 +153,29 @@ impl VerifyWorker {
>               .datastore
>               .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>   
> +        let reader_pool = ParallelHandler::new("read chunks", 4, {

nit: This sets the default for the reader threads to 4, in the 
subsequent patches that is then however changed to the default of 1 as 
defined in the api schemas, so while not an issue, this should already 
be set to that default value IMO.

> +            let decoder_pool = decoder_pool.channel();
> +            let datastore = Arc::clone(&self.datastore);
> +            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> +            let read_bytes = Arc::clone(&read_bytes);
> +            let decoded_bytes = Arc::clone(&decoded_bytes);
> +            let errors = Arc::clone(&errors);
> +            let backend = self.backend.clone();
> +
> +            move |info: ChunkReadInfo| {
> +                Self::verify_chunk_by_backend(
> +                    &backend,
> +                    Arc::clone(&datastore),
> +                    Arc::clone(&corrupt_chunks),
> +                    Arc::clone(&read_bytes),
> +                    Arc::clone(&decoded_bytes),
> +                    Arc::clone(&errors),
> +                    &decoder_pool,
> +                    &info,
> +                )
> +            }
> +        });
> +
>           for (pos, _) in chunk_list {
>               self.worker.check_abort()?;
>               self.worker.fail_on_shutdown()?;
> @@ -167,19 +187,16 @@ impl VerifyWorker {
>                   continue; // already verified or marked corrupt
>               }
>   
> -            self.verify_chunk_by_backend(
> -                &info,
> -                &mut read_bytes,
> -                &mut decoded_bytes,
> -                Arc::clone(&errors),
> -                &decoder_pool,
> -            )?;
> +            reader_pool.send(info)?;
>           }
>   
> -        decoder_pool.complete()?;
> +        reader_pool.complete()?;
>   
>           let elapsed = start_time.elapsed().as_secs_f64();
>   
> +        let read_bytes = read_bytes.load(Ordering::SeqCst);
> +        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> +
>           let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>           let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
>   
> @@ -199,26 +216,31 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> +    #[allow(clippy::too_many_arguments)]
>       fn verify_chunk_by_backend(
> -        &self,
> -        info: &ChunkReadInfo,
> -        read_bytes: &mut u64,
> -        decoded_bytes: &mut u64,
> +        backend: &DatastoreBackend,
> +        datastore: Arc<DataStore>,
> +        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +        read_bytes: Arc<AtomicU64>,
> +        decoded_bytes: Arc<AtomicU64>,
>           errors: Arc<AtomicUsize>,
> -        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
> +        decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
> +        info: &ChunkReadInfo,
>       ) -> Result<(), Error> {
> -        match &self.backend {
> -            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> -                Err(err) => self.add_corrupt_chunk(
> +        match backend {
> +            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> +                Err(err) => Self::add_corrupt_chunk(
> +                    datastore,
> +                    corrupt_chunks,
>                       info.digest,
>                       errors,
>                       &format!("can't verify chunk, load failed - {err}"),
>                   ),
>                   Ok(chunk) => {
>                       let size = info.size();
> -                    *read_bytes += chunk.raw_size();
> +                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                       decoder_pool.send((chunk, info.digest, size))?;
> -                    *decoded_bytes += size;
> +                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                   }
>               },
>               DatastoreBackend::S3(s3_client) => {
> @@ -235,9 +257,9 @@ impl VerifyWorker {
>                           match chunk_result {
>                               Ok(chunk) => {
>                                   let size = info.size();
> -                                *read_bytes += chunk.raw_size();
> +                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                                   decoder_pool.send((chunk, info.digest, size))?;
> -                                *decoded_bytes += size;
> +                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                               }
>                               Err(err) => {
>                                   errors.fetch_add(1, Ordering::SeqCst);
> @@ -245,7 +267,9 @@ impl VerifyWorker {
>                               }
>                           }
>                       }
> -                    Ok(None) => self.add_corrupt_chunk(
> +                    Ok(None) => Self::add_corrupt_chunk(
> +                        datastore,
> +                        corrupt_chunks,
>                           info.digest,
>                           errors,
>                           &format!(
> @@ -263,13 +287,19 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> -    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
> +    fn add_corrupt_chunk(
> +        datastore: Arc<DataStore>,
> +        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +        digest: [u8; 32],
> +        errors: Arc<AtomicUsize>,
> +        message: &str,
> +    ) {
>           // Panic on poisoned mutex
> -        let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
> +        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
>           corrupt_chunks.insert(digest);
>           error!(message);
>           errors.fetch_add(1, Ordering::SeqCst);
> -        match self.datastore.rename_corrupt_chunk(&digest) {
> +        match datastore.rename_corrupt_chunk(&digest) {
>               Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>               Err(err) => info!("{err}"),
>               _ => (),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2025-11-11 10:21 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-11 10:22   ` Christian Ebner [this message]
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
2025-11-11 10:35   ` Nicolas Frey
2025-11-13  9:33 ` [pbs-devel] superseded: " Nicolas Frey

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=a1cac53e-3aaf-4870-b174-fd56bb7dd333@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=n.frey@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal