public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Nicolas Frey <n.frey@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct
Date: Mon, 10 Nov 2025 09:44:13 +0100	[thread overview]
Message-ID: <20251110084417.173290-6-n.frey@proxmox.com> (raw)
In-Reply-To: <20251110084417.173290-1-n.frey@proxmox.com>

Introduces a new state struct `IndexVerifyState` so that we only need
to pass around and clone one `Arc`. 

Suggested-by: Christian Ebner <c.ebner@proxmox.com>
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
 1 file changed, 67 insertions(+), 63 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index c0ff15d4..9a20c8e1 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -34,6 +34,34 @@ pub struct VerifyWorker {
     backend: DatastoreBackend,
 }
 
+struct IndexVerifyState {
+    read_bytes: AtomicU64,
+    decoded_bytes: AtomicU64,
+    errors: AtomicUsize,
+    datastore: Arc<DataStore>,
+    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    start_time: Instant,
+}
+
+impl IndexVerifyState {
+    fn new(
+        datastore: &Arc<DataStore>,
+        corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+        verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+    ) -> Self {
+        Self {
+            read_bytes: AtomicU64::new(0),
+            decoded_bytes: AtomicU64::new(0),
+            errors: AtomicUsize::new(0),
+            datastore: Arc::clone(datastore),
+            corrupt_chunks: Arc::clone(corrupt_chunks),
+            verified_chunks: Arc::clone(verified_chunks),
+            start_time: Instant::now(),
+        }
+    }
+}
+
 impl VerifyWorker {
     /// Creates a new VerifyWorker for a given task worker and datastore.
     pub fn new(
@@ -81,24 +109,20 @@ impl VerifyWorker {
         index: Box<dyn IndexFile + Send>,
         crypt_mode: CryptMode,
     ) -> Result<(), Error> {
-        let errors = Arc::new(AtomicUsize::new(0));
-
-        let start_time = Instant::now();
-
-        let read_bytes = Arc::new(AtomicU64::new(0));
-        let decoded_bytes = Arc::new(AtomicU64::new(0));
+        let verify_state = Arc::new(IndexVerifyState::new(
+            &self.datastore,
+            &self.corrupt_chunks,
+            &self.verified_chunks,
+        ));
 
         let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
-            let datastore = Arc::clone(&self.datastore);
-            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
-            let verified_chunks = Arc::clone(&self.verified_chunks);
-            let errors = Arc::clone(&errors);
+            let verify_state = Arc::clone(&verify_state);
             move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
                 let chunk_crypt_mode = match chunk.crypt_mode() {
                     Err(err) => {
-                        corrupt_chunks.lock().unwrap().insert(digest);
+                        verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                         info!("can't verify chunk, unknown CryptMode - {err}");
-                        errors.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         return Ok(());
                     }
                     Ok(mode) => mode,
@@ -108,20 +132,20 @@ impl VerifyWorker {
                     info!(
                     "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
                 );
-                    errors.fetch_add(1, Ordering::SeqCst);
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 }
 
                 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
-                    corrupt_chunks.lock().unwrap().insert(digest);
+                    verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                     info!("{err}");
-                    errors.fetch_add(1, Ordering::SeqCst);
-                    match datastore.rename_corrupt_chunk(&digest) {
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
+                    match verify_state.datastore.rename_corrupt_chunk(&digest) {
                         Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
                         Err(err) => info!("{err}"),
                         _ => (),
                     }
                 } else {
-                    verified_chunks.lock().unwrap().insert(digest);
+                    verify_state.verified_chunks.lock().unwrap().insert(digest);
                 }
 
                 Ok(())
@@ -134,7 +158,7 @@ impl VerifyWorker {
             } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
                 let digest_str = hex::encode(digest);
                 info!("chunk {digest_str} was marked as corrupt");
-                errors.fetch_add(1, Ordering::SeqCst);
+                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 true
             } else {
                 false
@@ -155,27 +179,18 @@ impl VerifyWorker {
 
         let reader_pool = ParallelHandler::new("read chunks", 4, {
             let decoder_pool = decoder_pool.channel();
-            let datastore = Arc::clone(&self.datastore);
-            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
-            let read_bytes = Arc::clone(&read_bytes);
-            let decoded_bytes = Arc::clone(&decoded_bytes);
-            let errors = Arc::clone(&errors);
+            let verify_state = Arc::clone(&verify_state);
             let backend = self.backend.clone();
 
             move |info: ChunkReadInfo| {
                 Self::verify_chunk_by_backend(
                     &backend,
-                    Arc::clone(&datastore),
-                    Arc::clone(&corrupt_chunks),
-                    Arc::clone(&read_bytes),
-                    Arc::clone(&decoded_bytes),
-                    Arc::clone(&errors),
+                    Arc::clone(&verify_state),
                     &decoder_pool,
                     &info,
                 )
             }
         });
-
         for (pos, _) in chunk_list {
             self.worker.check_abort()?;
             self.worker.fail_on_shutdown()?;
@@ -192,10 +207,10 @@ impl VerifyWorker {
 
         reader_pool.complete()?;
 
-        let elapsed = start_time.elapsed().as_secs_f64();
+        let elapsed = verify_state.start_time.elapsed().as_secs_f64();
 
-        let read_bytes = read_bytes.load(Ordering::SeqCst);
-        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+        let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
+        let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
 
         let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
         let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -203,44 +218,39 @@ impl VerifyWorker {
         let read_speed = read_bytes_mib / elapsed;
         let decode_speed = decoded_bytes_mib / elapsed;
 
-        let error_count = errors.load(Ordering::SeqCst);
+        let error_count = verify_state.errors.load(Ordering::SeqCst);
 
         info!(
             "  verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
         );
 
-        if errors.load(Ordering::SeqCst) > 0 {
+        if verify_state.errors.load(Ordering::SeqCst) > 0 {
             bail!("chunks could not be verified");
         }
 
         Ok(())
     }
 
-    #[allow(clippy::too_many_arguments)]
     fn verify_chunk_by_backend(
         backend: &DatastoreBackend,
-        datastore: Arc<DataStore>,
-        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-        read_bytes: Arc<AtomicU64>,
-        decoded_bytes: Arc<AtomicU64>,
-        errors: Arc<AtomicUsize>,
+        verify_state: Arc<IndexVerifyState>,
         decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
         info: &ChunkReadInfo,
     ) -> Result<(), Error> {
         match backend {
-            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
+            DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
                 Err(err) => Self::add_corrupt_chunk(
-                    datastore,
-                    corrupt_chunks,
+                    verify_state,
                     info.digest,
-                    errors,
                     &format!("can't verify chunk, load failed - {err}"),
                 ),
                 Ok(chunk) => {
                     let size = info.size();
-                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+                    verify_state
+                        .read_bytes
+                        .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                     decoder_pool.send((chunk, info.digest, size))?;
-                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
+                    verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
                 }
             },
             DatastoreBackend::S3(s3_client) => {
@@ -257,28 +267,28 @@ impl VerifyWorker {
                         match chunk_result {
                             Ok(chunk) => {
                                 let size = info.size();
-                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+                                verify_state
+                                    .read_bytes
+                                    .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                                 decoder_pool.send((chunk, info.digest, size))?;
-                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
+                                verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
                             }
                             Err(err) => {
-                                errors.fetch_add(1, Ordering::SeqCst);
+                                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                                 error!("can't verify chunk, load failed - {err}");
                             }
                         }
                     }
                     Ok(None) => Self::add_corrupt_chunk(
-                        datastore,
-                        corrupt_chunks,
+                        verify_state,
                         info.digest,
-                        errors,
                         &format!(
                             "can't verify missing chunk with digest {}",
                             hex::encode(info.digest)
                         ),
                     ),
                     Err(err) => {
-                        errors.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         error!("can't verify chunk, load failed - {err}");
                     }
                 }
@@ -287,19 +297,13 @@ impl VerifyWorker {
         Ok(())
     }
 
-    fn add_corrupt_chunk(
-        datastore: Arc<DataStore>,
-        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-        digest: [u8; 32],
-        errors: Arc<AtomicUsize>,
-        message: &str,
-    ) {
+    fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
         // Panic on poisoned mutex
-        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
+        let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
         corrupt_chunks.insert(digest);
         error!(message);
-        errors.fetch_add(1, Ordering::SeqCst);
-        match datastore.rename_corrupt_chunk(&digest) {
+        verify_state.errors.fetch_add(1, Ordering::SeqCst);
+        match verify_state.datastore.rename_corrupt_chunk(&digest) {
             Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
             Err(err) => info!("{err}"),
             _ => (),
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-11-10  8:43 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` Nicolas Frey [this message]
2025-11-11 10:22   ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
2025-11-11 10:35   ` Nicolas Frey
2025-11-13  9:33 ` [pbs-devel] superseded: " Nicolas Frey

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251110084417.173290-6-n.frey@proxmox.com \
    --to=n.frey@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal