public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Nicolas Frey <n.frey@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v4 3/9] verify: refactor into associated functions to use new state struct
Date: Thu, 13 Nov 2025 10:31:12 +0100	[thread overview]
Message-ID: <20251113093118.195229-7-n.frey@proxmox.com> (raw)
In-Reply-To: <20251113093118.195229-1-n.frey@proxmox.com>

bundles nicely into one struct to pass around, to prepare for parallel
chunk loading.

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/backup/verify.rs | 114 +++++++++++++++++++++----------------------
 1 file changed, 57 insertions(+), 57 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index e8ea49b2..e1eda039 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
 
-use crate::tools::parallel_handler::ParallelHandler;
+use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
 
 use crate::backup::hierarchy::ListAccessibleBackupGroups;
 
@@ -109,27 +109,20 @@ impl VerifyWorker {
         index: Box<dyn IndexFile + Send>,
         crypt_mode: CryptMode,
     ) -> Result<(), Error> {
-        let errors = Arc::new(AtomicUsize::new(0));
-
-        let start_time = Instant::now();
-
-        let mut read_bytes = 0;
-        let mut decoded_bytes = 0;
-
-        let datastore2 = Arc::clone(&self.datastore);
-        let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
-        let verified_chunks2 = Arc::clone(&self.verified_chunks);
-        let errors2 = Arc::clone(&errors);
-
-        let decoder_pool = ParallelHandler::new(
-            "verify chunk decoder",
-            4,
+        let verify_state = Arc::new(IndexVerifyState::new(
+            &self.datastore,
+            &self.corrupt_chunks,
+            &self.verified_chunks,
+        ));
+
+        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
+            let verify_state = Arc::clone(&verify_state);
             move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
                 let chunk_crypt_mode = match chunk.crypt_mode() {
                     Err(err) => {
-                        corrupt_chunks2.lock().unwrap().insert(digest);
+                        verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                         info!("can't verify chunk, unknown CryptMode - {err}");
-                        errors2.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         return Ok(());
                     }
                     Ok(mode) => mode,
@@ -139,25 +132,25 @@ impl VerifyWorker {
                     info!(
                     "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
                 );
-                    errors2.fetch_add(1, Ordering::SeqCst);
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 }
 
                 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
-                    corrupt_chunks2.lock().unwrap().insert(digest);
+                    verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                     info!("{err}");
-                    errors2.fetch_add(1, Ordering::SeqCst);
-                    match datastore2.rename_corrupt_chunk(&digest) {
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
+                    match verify_state.datastore.rename_corrupt_chunk(&digest) {
                         Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
                         Err(err) => info!("{err}"),
                         _ => (),
                     }
                 } else {
-                    verified_chunks2.lock().unwrap().insert(digest);
+                    verify_state.verified_chunks.lock().unwrap().insert(digest);
                 }
 
                 Ok(())
-            },
-        );
+            }
+        });
 
         let skip_chunk = |digest: &[u8; 32]| -> bool {
             if self.verified_chunks.lock().unwrap().contains(digest) {
@@ -165,7 +158,7 @@ impl VerifyWorker {
             } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
                 let digest_str = hex::encode(digest);
                 info!("chunk {digest_str} was marked as corrupt");
-                errors.fetch_add(1, Ordering::SeqCst);
+                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 true
             } else {
                 false
@@ -195,18 +188,20 @@ impl VerifyWorker {
                 continue; // already verified or marked corrupt
             }
 
-            self.verify_chunk_by_backend(
+            Self::verify_chunk_by_backend(
+                &self.backend,
+                Arc::clone(&verify_state),
+                &decoder_pool.channel(),
                 &info,
-                &mut read_bytes,
-                &mut decoded_bytes,
-                Arc::clone(&errors),
-                &decoder_pool,
             )?;
         }
 
         decoder_pool.complete()?;
 
-        let elapsed = start_time.elapsed().as_secs_f64();
+        let elapsed = verify_state.start_time.elapsed().as_secs_f64();
+
+        let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
+        let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
 
         let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
         let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -214,13 +209,13 @@ impl VerifyWorker {
         let read_speed = read_bytes_mib / elapsed;
         let decode_speed = decoded_bytes_mib / elapsed;
 
-        let error_count = errors.load(Ordering::SeqCst);
+        let error_count = verify_state.errors.load(Ordering::SeqCst);
 
         info!(
             "  verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
         );
 
-        if errors.load(Ordering::SeqCst) > 0 {
+        if verify_state.errors.load(Ordering::SeqCst) > 0 {
             bail!("chunks could not be verified");
         }
 
@@ -228,25 +223,25 @@ impl VerifyWorker {
     }
 
     fn verify_chunk_by_backend(
-        &self,
+        backend: &DatastoreBackend,
+        verify_state: Arc<IndexVerifyState>,
+        decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
         info: &ChunkReadInfo,
-        read_bytes: &mut u64,
-        decoded_bytes: &mut u64,
-        errors: Arc<AtomicUsize>,
-        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
     ) -> Result<(), Error> {
-        match &self.backend {
-            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
-                Err(err) => self.add_corrupt_chunk(
+        match backend {
+            DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
+                Err(err) => Self::add_corrupt_chunk(
+                    verify_state,
                     info.digest,
-                    errors,
                     &format!("can't verify chunk, load failed - {err}"),
                 ),
                 Ok(chunk) => {
                     let size = info.size();
-                    *read_bytes += chunk.raw_size();
+                    verify_state
+                        .read_bytes
+                        .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                     decoder_pool.send((chunk, info.digest, size))?;
-                    *decoded_bytes += size;
+                    verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
                 }
             },
             DatastoreBackend::S3(s3_client) => {
@@ -258,13 +253,17 @@ impl VerifyWorker {
                                 match DataBlob::from_raw(raw_chunk.to_bytes().to_vec()) {
                                     Ok(chunk) => {
                                         let size = info.size();
-                                        *read_bytes += chunk.raw_size();
+                                        verify_state
+                                            .read_bytes
+                                            .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                                         decoder_pool.send((chunk, info.digest, size))?;
-                                        *decoded_bytes += size;
+                                        verify_state
+                                            .decoded_bytes
+                                            .fetch_add(size, Ordering::SeqCst);
                                     }
-                                    Err(err) => self.add_corrupt_chunk(
+                                    Err(err) => Self::add_corrupt_chunk(
+                                        verify_state,
                                         info.digest,
-                                        errors,
                                         &format!(
                                             "can't verify chunk with digest {} - {err}",
                                             hex::encode(info.digest)
@@ -273,21 +272,21 @@ impl VerifyWorker {
                                 }
                             }
                             Err(err) => {
-                                errors.fetch_add(1, Ordering::SeqCst);
+                                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                                 error!("can't verify chunk, load failed - {err}");
                             }
                         }
                     }
-                    Ok(None) => self.add_corrupt_chunk(
+                    Ok(None) => Self::add_corrupt_chunk(
+                        verify_state,
                         info.digest,
-                        errors,
                         &format!(
                             "can't verify missing chunk with digest {}",
                             hex::encode(info.digest)
                         ),
                     ),
                     Err(err) => {
-                        errors.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         error!("can't verify chunk, load failed - {err}");
                     }
                 }
@@ -296,12 +295,13 @@ impl VerifyWorker {
         Ok(())
     }
 
-    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+    fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
         // Panic on poisoned mutex
-        self.corrupt_chunks.lock().unwrap().insert(digest);
+        let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
+        corrupt_chunks.insert(digest);
         error!(message);
-        errors.fetch_add(1, Ordering::SeqCst);
-        match self.datastore.rename_corrupt_chunk(&digest) {
+        verify_state.errors.fetch_add(1, Ordering::SeqCst);
+        match verify_state.datastore.rename_corrupt_chunk(&digest) {
             Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
             Err(err) => info!("{err}"),
             _ => (),
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-11-13  9:31 UTC|newest]

Thread overview: 18+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-13  9:31 [pbs-devel] [PATCH proxmox{, -backup} v4 00/12] parallelize chunk reads in verification Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox v4 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox v4 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox v4 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 1/9] verify: correct typo in comment Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 2/9] verify: introduce new state struct Nicolas Frey
2025-11-13  9:31 ` Nicolas Frey [this message]
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 4/9] verify: move chunk loading into parallel handler Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 5/9] verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 6/9] verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 7/9] verify: add {read, verify}-threads to api schema in backup manager Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 8/9] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-13  9:31 ` [pbs-devel] [PATCH proxmox-backup v4 9/9] docs: verify: document {read, verify}-threads and update screenshot Nicolas Frey
2025-11-14 10:41   ` Christian Ebner
2025-11-13 12:53 ` [pbs-devel] [PATCH proxmox{, -backup} v4 00/12] parallelize chunk reads in verification Fabian Grünbichler
2025-11-14 10:46 ` Christian Ebner
2025-11-14 21:36 ` [pbs-devel] applied: " Thomas Lamprecht
2025-11-14 22:17 ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251113093118.195229-7-n.frey@proxmox.com \
    --to=n.frey@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal