all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification
@ 2025-11-10  8:44 Nicolas Frey
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
                   ` (10 more replies)
  0 siblings, 11 replies; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

This patch series aims to expand on Dominik's series [0] written for
pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
thread pool from the verification.

The number of threads was previously hard-coded, but is now
configurable via the API and GUI with new properties called
`{read,verify}-threads`, similarly to tape backups.

The number of threads should also be configurable through tuning
options or datastore config (as discussed by Chris & Thomas on list),
which can be added in a follow up patch series.

In my local tests I measured the following speed difference:
verified a single snapshot with ~32 GiB (4x the RAM size) with 4
cores (just changing the read threads)

1 thread:    ~440MiB/s
2 threads:   ~780MiB/s
4 threads:   ~1140MiB/s

[0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t

Changes since v2:
* split move to parallel handler into 2 seperate commits
* add constructor to `IndexVerifyState` and clean up nits

Changes since v1, thanks to Chris:
* define dedicated schema for {worker,read,verify}-threads
* rebase proxmox-backup
* introduce new state struct `IndexVerifyState` to reduce the amount
  of Arc clones and overall better bundling
* adjust update endpoint and UI to the new properties

proxmox:

Nicolas Frey (3):
  pbs-api-types: add schema for {worker,read,verify}-threads
  pbs-api-types: jobs: add {read,verify}-threads to
    VerificationJobConfig
  pbs-api-types: use worker-threads schema for TapeBackupJobSetup

 pbs-api-types/src/datastore.rs | 14 ++++++++++++++
 pbs-api-types/src/jobs.rs      | 20 +++++++++++++++-----
 2 files changed, 29 insertions(+), 5 deletions(-)


proxmox-backup:

Nicolas Frey (6):
  api: verify: move chunk loading into parallel handler
  api: verify: bundle parameters into new struct
  api: verify: determine the number of threads to use with
    {read,verify}-threads
  api: verify: correct typo in comment
  api: verify: add {read,verify}-threads to update endpoint
  ui: verify: add option to set number of threads for job

 src/api2/admin/datastore.rs    |  18 +++-
 src/api2/backup/environment.rs |   2 +-
 src/api2/config/verify.rs      |  16 ++++
 src/backup/verify.rs           | 164 +++++++++++++++++++++------------
 src/server/verify_job.rs       |   7 +-
 www/window/VerifyAll.js        |  18 ++++
 www/window/VerifyJobEdit.js    |  24 +++++
 7 files changed, 185 insertions(+), 64 deletions(-)


Summary over all repositories:
  9 files changed, 214 insertions(+), 69 deletions(-)

-- 
Generated by git-murpp 0.8.1

_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
                   ` (9 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

Add seperate schemas for read and verify thread count to use in
chunk verification. Also add worker-threads for use in
`TapeBackupJobSetup`.

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 pbs-api-types/src/datastore.rs | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
index a214ac25..3dd9ce75 100644
--- a/pbs-api-types/src/datastore.rs
+++ b/pbs-api-types/src/datastore.rs
@@ -164,6 +164,20 @@ pub const PRUNE_SCHEMA_KEEP_YEARLY: Schema =
         .minimum(1)
         .schema();
 
+const fn threads_schema(default: i64) -> Schema {
+    IntegerSchema::new("The number of threads to use for the job.")
+        .minimum(1)
+        .maximum(32)
+        .default(default)
+        .schema()
+}
+
+pub const WORKER_THREADS_SCHEMA: Schema = threads_schema(1);
+
+pub const READ_THREADS_SCHEMA: Schema = threads_schema(1);
+
+pub const VERIFY_THREADS_SCHEMA: Schema = threads_schema(4);
+
 /// Base directory where datastores are mounted
 pub const DATASTORE_MOUNT_DIR: &str = "/mnt/datastore";
 
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
                   ` (8 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

controls the number of parallel threads to read/verify chunks
similarly to tape backup job `worker-threads`

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 pbs-api-types/src/jobs.rs | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 4dbbef2b..2b9cef38 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -203,6 +203,14 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema =
             optional: true,
             schema: crate::NS_MAX_DEPTH_SCHEMA,
         },
+        "read-threads": {
+            schema: crate::READ_THREADS_SCHEMA,
+            optional: true,
+        },
+        "verify-threads": {
+            schema: crate::VERIFY_THREADS_SCHEMA,
+            optional: true,
+        },
     }
 )]
 #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)]
@@ -233,6 +241,12 @@ pub struct VerificationJobConfig {
     /// how deep the verify should go from the `ns` level downwards. Passing 0 verifies only the
     /// snapshots on the same level as the passed `ns`, or the datastore root if none.
     pub max_depth: Option<usize>,
+    /// The number of read threads to use for the job.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub read_threads: Option<usize>,
+    /// The number of verification threads to use for the job.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub verify_threads: Option<usize>,
 }
 
 impl VerificationJobConfig {
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
                   ` (7 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

use new worker-threads schema

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 pbs-api-types/src/jobs.rs | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 2b9cef38..ba2af6d7 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -321,11 +321,8 @@ pub struct VerificationJobStatus {
             optional: true,
         },
         "worker-threads": {
-            type: Integer,
+            schema: crate::WORKER_THREADS_SCHEMA,
             optional: true,
-            minimum: 1,
-            maximum: 32,
-            default: 1,
         },
     }
 )]
@@ -353,7 +350,6 @@ pub struct TapeBackupJobSetup {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none", default)]
     pub max_depth: Option<usize>,
-    /// Set the number of worker threads to use for the job
     #[serde(skip_serializing_if = "Option::is_none")]
     pub worker_threads: Option<u64>,
 }
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (2 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
                   ` (6 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

This way, the chunks will be loaded in parallel in addition to being
checked in parallel.

Depending on the underlying storage, this can speed up reading chunks
from disk, especially when the underlying storage is IO depth
dependent, and the CPU is faster than the storage.

Originally-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/backup/verify.rs | 120 +++++++++++++++++++++++++++----------------
 1 file changed, 75 insertions(+), 45 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 31c03891..c0ff15d4 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -1,6 +1,6 @@
 use pbs_config::BackupLockGuard;
 use std::collections::HashSet;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::Instant;
 
@@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
 
-use crate::tools::parallel_handler::ParallelHandler;
+use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
 
 use crate::backup::hierarchy::ListAccessibleBackupGroups;
 
@@ -85,23 +85,20 @@ impl VerifyWorker {
 
         let start_time = Instant::now();
 
-        let mut read_bytes = 0;
-        let mut decoded_bytes = 0;
+        let read_bytes = Arc::new(AtomicU64::new(0));
+        let decoded_bytes = Arc::new(AtomicU64::new(0));
 
-        let datastore2 = Arc::clone(&self.datastore);
-        let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
-        let verified_chunks2 = Arc::clone(&self.verified_chunks);
-        let errors2 = Arc::clone(&errors);
-
-        let decoder_pool = ParallelHandler::new(
-            "verify chunk decoder",
-            4,
+        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
+            let datastore = Arc::clone(&self.datastore);
+            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
+            let verified_chunks = Arc::clone(&self.verified_chunks);
+            let errors = Arc::clone(&errors);
             move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
                 let chunk_crypt_mode = match chunk.crypt_mode() {
                     Err(err) => {
-                        corrupt_chunks2.lock().unwrap().insert(digest);
+                        corrupt_chunks.lock().unwrap().insert(digest);
                         info!("can't verify chunk, unknown CryptMode - {err}");
-                        errors2.fetch_add(1, Ordering::SeqCst);
+                        errors.fetch_add(1, Ordering::SeqCst);
                         return Ok(());
                     }
                     Ok(mode) => mode,
@@ -111,25 +108,25 @@ impl VerifyWorker {
                     info!(
                     "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
                 );
-                    errors2.fetch_add(1, Ordering::SeqCst);
+                    errors.fetch_add(1, Ordering::SeqCst);
                 }
 
                 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
-                    corrupt_chunks2.lock().unwrap().insert(digest);
+                    corrupt_chunks.lock().unwrap().insert(digest);
                     info!("{err}");
-                    errors2.fetch_add(1, Ordering::SeqCst);
-                    match datastore2.rename_corrupt_chunk(&digest) {
+                    errors.fetch_add(1, Ordering::SeqCst);
+                    match datastore.rename_corrupt_chunk(&digest) {
                         Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
                         Err(err) => info!("{err}"),
                         _ => (),
                     }
                 } else {
-                    verified_chunks2.lock().unwrap().insert(digest);
+                    verified_chunks.lock().unwrap().insert(digest);
                 }
 
                 Ok(())
-            },
-        );
+            }
+        });
 
         let skip_chunk = |digest: &[u8; 32]| -> bool {
             if self.verified_chunks.lock().unwrap().contains(digest) {
@@ -156,6 +153,29 @@ impl VerifyWorker {
             .datastore
             .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
 
+        let reader_pool = ParallelHandler::new("read chunks", 4, {
+            let decoder_pool = decoder_pool.channel();
+            let datastore = Arc::clone(&self.datastore);
+            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
+            let read_bytes = Arc::clone(&read_bytes);
+            let decoded_bytes = Arc::clone(&decoded_bytes);
+            let errors = Arc::clone(&errors);
+            let backend = self.backend.clone();
+
+            move |info: ChunkReadInfo| {
+                Self::verify_chunk_by_backend(
+                    &backend,
+                    Arc::clone(&datastore),
+                    Arc::clone(&corrupt_chunks),
+                    Arc::clone(&read_bytes),
+                    Arc::clone(&decoded_bytes),
+                    Arc::clone(&errors),
+                    &decoder_pool,
+                    &info,
+                )
+            }
+        });
+
         for (pos, _) in chunk_list {
             self.worker.check_abort()?;
             self.worker.fail_on_shutdown()?;
@@ -167,19 +187,16 @@ impl VerifyWorker {
                 continue; // already verified or marked corrupt
             }
 
-            self.verify_chunk_by_backend(
-                &info,
-                &mut read_bytes,
-                &mut decoded_bytes,
-                Arc::clone(&errors),
-                &decoder_pool,
-            )?;
+            reader_pool.send(info)?;
         }
 
-        decoder_pool.complete()?;
+        reader_pool.complete()?;
 
         let elapsed = start_time.elapsed().as_secs_f64();
 
+        let read_bytes = read_bytes.load(Ordering::SeqCst);
+        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+
         let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
         let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
 
@@ -199,26 +216,31 @@ impl VerifyWorker {
         Ok(())
     }
 
+    #[allow(clippy::too_many_arguments)]
     fn verify_chunk_by_backend(
-        &self,
-        info: &ChunkReadInfo,
-        read_bytes: &mut u64,
-        decoded_bytes: &mut u64,
+        backend: &DatastoreBackend,
+        datastore: Arc<DataStore>,
+        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+        read_bytes: Arc<AtomicU64>,
+        decoded_bytes: Arc<AtomicU64>,
         errors: Arc<AtomicUsize>,
-        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
+        decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
+        info: &ChunkReadInfo,
     ) -> Result<(), Error> {
-        match &self.backend {
-            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
-                Err(err) => self.add_corrupt_chunk(
+        match backend {
+            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
+                Err(err) => Self::add_corrupt_chunk(
+                    datastore,
+                    corrupt_chunks,
                     info.digest,
                     errors,
                     &format!("can't verify chunk, load failed - {err}"),
                 ),
                 Ok(chunk) => {
                     let size = info.size();
-                    *read_bytes += chunk.raw_size();
+                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
                     decoder_pool.send((chunk, info.digest, size))?;
-                    *decoded_bytes += size;
+                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
                 }
             },
             DatastoreBackend::S3(s3_client) => {
@@ -235,9 +257,9 @@ impl VerifyWorker {
                         match chunk_result {
                             Ok(chunk) => {
                                 let size = info.size();
-                                *read_bytes += chunk.raw_size();
+                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
                                 decoder_pool.send((chunk, info.digest, size))?;
-                                *decoded_bytes += size;
+                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
                             }
                             Err(err) => {
                                 errors.fetch_add(1, Ordering::SeqCst);
@@ -245,7 +267,9 @@ impl VerifyWorker {
                             }
                         }
                     }
-                    Ok(None) => self.add_corrupt_chunk(
+                    Ok(None) => Self::add_corrupt_chunk(
+                        datastore,
+                        corrupt_chunks,
                         info.digest,
                         errors,
                         &format!(
@@ -263,13 +287,19 @@ impl VerifyWorker {
         Ok(())
     }
 
-    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+    fn add_corrupt_chunk(
+        datastore: Arc<DataStore>,
+        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+        digest: [u8; 32],
+        errors: Arc<AtomicUsize>,
+        message: &str,
+    ) {
         // Panic on poisoned mutex
-        let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
+        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
         corrupt_chunks.insert(digest);
         error!(message);
         errors.fetch_add(1, Ordering::SeqCst);
-        match self.datastore.rename_corrupt_chunk(&digest) {
+        match datastore.rename_corrupt_chunk(&digest) {
             Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
             Err(err) => info!("{err}"),
             _ => (),
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (3 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
                   ` (5 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

Introduces a new state struct `IndexVerifyState` so that we only need
to pass around and clone one `Arc`. 

Suggested-by: Christian Ebner <c.ebner@proxmox.com>
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
 1 file changed, 67 insertions(+), 63 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index c0ff15d4..9a20c8e1 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -34,6 +34,34 @@ pub struct VerifyWorker {
     backend: DatastoreBackend,
 }
 
+struct IndexVerifyState {
+    read_bytes: AtomicU64,
+    decoded_bytes: AtomicU64,
+    errors: AtomicUsize,
+    datastore: Arc<DataStore>,
+    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    start_time: Instant,
+}
+
+impl IndexVerifyState {
+    fn new(
+        datastore: &Arc<DataStore>,
+        corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+        verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+    ) -> Self {
+        Self {
+            read_bytes: AtomicU64::new(0),
+            decoded_bytes: AtomicU64::new(0),
+            errors: AtomicUsize::new(0),
+            datastore: Arc::clone(datastore),
+            corrupt_chunks: Arc::clone(corrupt_chunks),
+            verified_chunks: Arc::clone(verified_chunks),
+            start_time: Instant::now(),
+        }
+    }
+}
+
 impl VerifyWorker {
     /// Creates a new VerifyWorker for a given task worker and datastore.
     pub fn new(
@@ -81,24 +109,20 @@ impl VerifyWorker {
         index: Box<dyn IndexFile + Send>,
         crypt_mode: CryptMode,
     ) -> Result<(), Error> {
-        let errors = Arc::new(AtomicUsize::new(0));
-
-        let start_time = Instant::now();
-
-        let read_bytes = Arc::new(AtomicU64::new(0));
-        let decoded_bytes = Arc::new(AtomicU64::new(0));
+        let verify_state = Arc::new(IndexVerifyState::new(
+            &self.datastore,
+            &self.corrupt_chunks,
+            &self.verified_chunks,
+        ));
 
         let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
-            let datastore = Arc::clone(&self.datastore);
-            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
-            let verified_chunks = Arc::clone(&self.verified_chunks);
-            let errors = Arc::clone(&errors);
+            let verify_state = Arc::clone(&verify_state);
             move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
                 let chunk_crypt_mode = match chunk.crypt_mode() {
                     Err(err) => {
-                        corrupt_chunks.lock().unwrap().insert(digest);
+                        verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                         info!("can't verify chunk, unknown CryptMode - {err}");
-                        errors.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         return Ok(());
                     }
                     Ok(mode) => mode,
@@ -108,20 +132,20 @@ impl VerifyWorker {
                     info!(
                     "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
                 );
-                    errors.fetch_add(1, Ordering::SeqCst);
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 }
 
                 if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
-                    corrupt_chunks.lock().unwrap().insert(digest);
+                    verify_state.corrupt_chunks.lock().unwrap().insert(digest);
                     info!("{err}");
-                    errors.fetch_add(1, Ordering::SeqCst);
-                    match datastore.rename_corrupt_chunk(&digest) {
+                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
+                    match verify_state.datastore.rename_corrupt_chunk(&digest) {
                         Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
                         Err(err) => info!("{err}"),
                         _ => (),
                     }
                 } else {
-                    verified_chunks.lock().unwrap().insert(digest);
+                    verify_state.verified_chunks.lock().unwrap().insert(digest);
                 }
 
                 Ok(())
@@ -134,7 +158,7 @@ impl VerifyWorker {
             } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
                 let digest_str = hex::encode(digest);
                 info!("chunk {digest_str} was marked as corrupt");
-                errors.fetch_add(1, Ordering::SeqCst);
+                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                 true
             } else {
                 false
@@ -155,27 +179,18 @@ impl VerifyWorker {
 
         let reader_pool = ParallelHandler::new("read chunks", 4, {
             let decoder_pool = decoder_pool.channel();
-            let datastore = Arc::clone(&self.datastore);
-            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
-            let read_bytes = Arc::clone(&read_bytes);
-            let decoded_bytes = Arc::clone(&decoded_bytes);
-            let errors = Arc::clone(&errors);
+            let verify_state = Arc::clone(&verify_state);
             let backend = self.backend.clone();
 
             move |info: ChunkReadInfo| {
                 Self::verify_chunk_by_backend(
                     &backend,
-                    Arc::clone(&datastore),
-                    Arc::clone(&corrupt_chunks),
-                    Arc::clone(&read_bytes),
-                    Arc::clone(&decoded_bytes),
-                    Arc::clone(&errors),
+                    Arc::clone(&verify_state),
                     &decoder_pool,
                     &info,
                 )
             }
         });
-
         for (pos, _) in chunk_list {
             self.worker.check_abort()?;
             self.worker.fail_on_shutdown()?;
@@ -192,10 +207,10 @@ impl VerifyWorker {
 
         reader_pool.complete()?;
 
-        let elapsed = start_time.elapsed().as_secs_f64();
+        let elapsed = verify_state.start_time.elapsed().as_secs_f64();
 
-        let read_bytes = read_bytes.load(Ordering::SeqCst);
-        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+        let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
+        let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
 
         let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
         let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -203,44 +218,39 @@ impl VerifyWorker {
         let read_speed = read_bytes_mib / elapsed;
         let decode_speed = decoded_bytes_mib / elapsed;
 
-        let error_count = errors.load(Ordering::SeqCst);
+        let error_count = verify_state.errors.load(Ordering::SeqCst);
 
         info!(
             "  verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
         );
 
-        if errors.load(Ordering::SeqCst) > 0 {
+        if verify_state.errors.load(Ordering::SeqCst) > 0 {
             bail!("chunks could not be verified");
         }
 
         Ok(())
     }
 
-    #[allow(clippy::too_many_arguments)]
     fn verify_chunk_by_backend(
         backend: &DatastoreBackend,
-        datastore: Arc<DataStore>,
-        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-        read_bytes: Arc<AtomicU64>,
-        decoded_bytes: Arc<AtomicU64>,
-        errors: Arc<AtomicUsize>,
+        verify_state: Arc<IndexVerifyState>,
         decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
         info: &ChunkReadInfo,
     ) -> Result<(), Error> {
         match backend {
-            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
+            DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
                 Err(err) => Self::add_corrupt_chunk(
-                    datastore,
-                    corrupt_chunks,
+                    verify_state,
                     info.digest,
-                    errors,
                     &format!("can't verify chunk, load failed - {err}"),
                 ),
                 Ok(chunk) => {
                     let size = info.size();
-                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+                    verify_state
+                        .read_bytes
+                        .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                     decoder_pool.send((chunk, info.digest, size))?;
-                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
+                    verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
                 }
             },
             DatastoreBackend::S3(s3_client) => {
@@ -257,28 +267,28 @@ impl VerifyWorker {
                         match chunk_result {
                             Ok(chunk) => {
                                 let size = info.size();
-                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+                                verify_state
+                                    .read_bytes
+                                    .fetch_add(chunk.raw_size(), Ordering::SeqCst);
                                 decoder_pool.send((chunk, info.digest, size))?;
-                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
+                                verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
                             }
                             Err(err) => {
-                                errors.fetch_add(1, Ordering::SeqCst);
+                                verify_state.errors.fetch_add(1, Ordering::SeqCst);
                                 error!("can't verify chunk, load failed - {err}");
                             }
                         }
                     }
                     Ok(None) => Self::add_corrupt_chunk(
-                        datastore,
-                        corrupt_chunks,
+                        verify_state,
                         info.digest,
-                        errors,
                         &format!(
                             "can't verify missing chunk with digest {}",
                             hex::encode(info.digest)
                         ),
                     ),
                     Err(err) => {
-                        errors.fetch_add(1, Ordering::SeqCst);
+                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
                         error!("can't verify chunk, load failed - {err}");
                     }
                 }
@@ -287,19 +297,13 @@ impl VerifyWorker {
         Ok(())
     }
 
-    fn add_corrupt_chunk(
-        datastore: Arc<DataStore>,
-        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-        digest: [u8; 32],
-        errors: Arc<AtomicUsize>,
-        message: &str,
-    ) {
+    fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
         // Panic on poisoned mutex
-        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
+        let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
         corrupt_chunks.insert(digest);
         error!(message);
-        errors.fetch_add(1, Ordering::SeqCst);
-        match datastore.rename_corrupt_chunk(&digest) {
+        verify_state.errors.fetch_add(1, Ordering::SeqCst);
+        match verify_state.datastore.rename_corrupt_chunk(&digest) {
             Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
             Err(err) => info!("{err}"),
             _ => (),
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (4 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
                   ` (4 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

use previously introduced {read,verify}-threads in API, where default
values match the ones of the schema definition.

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/api2/admin/datastore.rs    | 16 ++++++++++++++--
 src/api2/backup/environment.rs |  2 +-
 src/backup/verify.rs           | 16 ++++++++++++++--
 src/server/verify_job.rs       |  7 ++++++-
 4 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 6e269ef9..5e4dd3fc 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -45,7 +45,8 @@ use pbs_api_types::{
     BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
     IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
-    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
+    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, READ_THREADS_SCHEMA, UPID, UPID_SCHEMA,
+    VERIFICATION_OUTDATED_AFTER_SCHEMA, VERIFY_THREADS_SCHEMA,
 };
 use pbs_client::pxar::{create_tar, create_zip};
 use pbs_config::CachedUserInfo;
@@ -675,6 +676,14 @@ pub async fn status(
                 schema: NS_MAX_DEPTH_SCHEMA,
                 optional: true,
             },
+            "read-threads": {
+                schema: READ_THREADS_SCHEMA,
+                optional: true,
+            },
+            "verify-threads": {
+                schema: VERIFY_THREADS_SCHEMA,
+                optional: true,
+            },
         },
     },
     returns: {
@@ -700,6 +709,8 @@ pub fn verify(
     ignore_verified: Option<bool>,
     outdated_after: Option<i64>,
     max_depth: Option<usize>,
+    read_threads: Option<usize>,
+    verify_threads: Option<usize>,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<Value, Error> {
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -779,7 +790,8 @@ pub fn verify(
         auth_id.to_string(),
         to_stdout,
         move |worker| {
-            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+            let verify_worker =
+                VerifyWorker::new(worker.clone(), datastore, read_threads, verify_threads)?;
             let failed_dirs = if let Some(backup_dir) = backup_dir {
                 let mut res = Vec::new();
                 if !verify_worker.verify_backup_dir(
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 0faf6c8e..06696c78 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -795,7 +795,7 @@ impl BackupEnvironment {
             move |worker| {
                 worker.log_message("Automatically verifying newly added snapshot");
 
-                let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+                let verify_worker = VerifyWorker::new(worker.clone(), datastore, None, None)?;
                 if !verify_worker.verify_backup_dir_with_lock(
                     &backup_dir,
                     worker.upid().clone(),
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 9a20c8e1..8a530159 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -32,6 +32,8 @@ pub struct VerifyWorker {
     verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
     backend: DatastoreBackend,
+    read_threads: usize,
+    verify_threads: usize,
 }
 
 struct IndexVerifyState {
@@ -67,6 +69,8 @@ impl VerifyWorker {
     pub fn new(
         worker: Arc<dyn WorkerTaskContext>,
         datastore: Arc<DataStore>,
+        read_threads: Option<usize>,
+        verify_threads: Option<usize>,
     ) -> Result<Self, Error> {
         let backend = datastore.backend()?;
         Ok(Self {
@@ -77,6 +81,8 @@ impl VerifyWorker {
             // start with 64 chunks since we assume there are few corrupt ones
             corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
             backend,
+            read_threads: read_threads.unwrap_or(1),
+            verify_threads: verify_threads.unwrap_or(4),
         })
     }
 
@@ -115,7 +121,7 @@ impl VerifyWorker {
             &self.verified_chunks,
         ));
 
-        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
+        let decoder_pool = ParallelHandler::new("verify chunk decoder", self.verify_threads, {
             let verify_state = Arc::clone(&verify_state);
             move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
                 let chunk_crypt_mode = match chunk.crypt_mode() {
@@ -177,7 +183,7 @@ impl VerifyWorker {
             .datastore
             .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
 
-        let reader_pool = ParallelHandler::new("read chunks", 4, {
+        let reader_pool = ParallelHandler::new("read chunks", self.read_threads, {
             let decoder_pool = decoder_pool.channel();
             let verify_state = Arc::clone(&verify_state);
             let backend = self.backend.clone();
@@ -578,6 +584,12 @@ impl VerifyWorker {
         let group_count = list.len();
         info!("found {group_count} groups");
 
+        log::info!(
+            "using {} read and {} verify thread(s)",
+            self.read_threads,
+            self.verify_threads,
+        );
+
         let mut progress = StoreProgress::new(group_count as u64);
 
         for (pos, group) in list.into_iter().enumerate() {
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index c8792174..e0b03155 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -41,7 +41,12 @@ pub fn do_verification_job(
                 None => Default::default(),
             };
 
-            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+            let verify_worker = VerifyWorker::new(
+                worker.clone(),
+                datastore,
+                verification_job.read_threads,
+                verification_job.verify_threads,
+            )?;
             let result = verify_worker.verify_all_backups(
                 worker.upid(),
                 ns,
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (5 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
                   ` (3 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/api2/admin/datastore.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 5e4dd3fc..fde4c247 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -697,7 +697,7 @@ pub async fn status(
 )]
 /// Verify backups.
 ///
-/// This function can verify a single backup snapshot, all backup from a backup group,
+/// This function can verify a single backup snapshot, all backups from a backup group,
 /// or all backups in the datastore.
 #[allow(clippy::too_many_arguments)]
 pub fn verify(
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (6 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
                   ` (2 subsequent siblings)
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

Allows to add, update or delete the {read,verify}-thread property in
the verify job config via the API.

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 src/api2/config/verify.rs | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs
index e71e0c2e..a88a3c32 100644
--- a/src/api2/config/verify.rs
+++ b/src/api2/config/verify.rs
@@ -149,6 +149,10 @@ pub enum DeletableProperty {
     Ns,
     /// Delete max-depth property, defaulting to full recursion again
     MaxDepth,
+    /// Delete read-threads property
+    ReadThreads,
+    /// Delete verify-threads property
+    VerifyThreads,
 }
 
 #[api(
@@ -229,6 +233,12 @@ pub fn update_verification_job(
                 DeletableProperty::MaxDepth => {
                     data.max_depth = None;
                 }
+                DeletableProperty::ReadThreads => {
+                    data.read_threads = None;
+                }
+                DeletableProperty::VerifyThreads => {
+                    data.verify_threads = None;
+                }
             }
         }
     }
@@ -266,6 +276,12 @@ pub fn update_verification_job(
             data.max_depth = Some(max_depth);
         }
     }
+    if update.read_threads.is_some() {
+        data.read_threads = update.read_threads;
+    }
+    if update.verify_threads.is_some() {
+        data.verify_threads = update.verify_threads;
+    }
 
     // check new store and NS
     user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?;
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (7 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
@ 2025-11-10  8:44 ` Nicolas Frey
  2025-11-11 10:22   ` Christian Ebner
  2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
  2025-11-13  9:33 ` [pbs-devel] superseded: " Nicolas Frey
  10 siblings, 1 reply; 22+ messages in thread
From: Nicolas Frey @ 2025-11-10  8:44 UTC (permalink / raw)
  To: pbs-devel

Exposes the {read,verify}-threads property in the `VerifyJobEdit`
window and `VerifyAll` window so the user can update the values
accordingly.

Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
 www/window/VerifyAll.js     | 18 ++++++++++++++++++
 www/window/VerifyJobEdit.js | 24 ++++++++++++++++++++++++
 2 files changed, 42 insertions(+)

diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js
index 01bcd63d..4239c215 100644
--- a/www/window/VerifyAll.js
+++ b/www/window/VerifyAll.js
@@ -80,6 +80,24 @@ Ext.define('PBS.window.VerifyAll', {
                         },
                     ],
                 },
+                {
+                    xtype: 'proxmoxintegerfield',
+                    name: 'read-threads',
+                    fieldLabel: gettext('# of Read Threads'),
+                    emptyText: '1',
+                    skipEmptyText: true,
+                    minValue: 1,
+                    maxValue: 32,
+                },
+                {
+                    xtype: 'proxmoxintegerfield',
+                    name: 'verify-threads',
+                    fieldLabel: gettext('# of Verify Threads'),
+                    emptyText: '4',
+                    skipEmptyText: true,
+                    minValue: 1,
+                    maxValue: 32,
+                },
             ],
         },
     ],
diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js
index e87ca069..5650ed5c 100644
--- a/www/window/VerifyJobEdit.js
+++ b/www/window/VerifyJobEdit.js
@@ -154,6 +154,30 @@ Ext.define('PBS.window.VerifyJobEdit', {
             },
         ],
         advancedColumn1: [
+            {
+                xtype: 'proxmoxintegerfield',
+                name: 'read-threads',
+                fieldLabel: gettext('# of Read Threads'),
+                emptyText: '1',
+                minValue: 1,
+                maxValue: 32,
+                cbind: {
+                    deleteEmpty: '{!isCreate}',
+                },
+            },
+            {
+                xtype: 'proxmoxintegerfield',
+                name: 'verify-threads',
+                fieldLabel: gettext('# of Verify Threads'),
+                emptyText: '4',
+                minValue: 1,
+                maxValue: 32,
+                cbind: {
+                    deleteEmpty: '{!isCreate}',
+                },
+            },
+        ],
+        advancedColumn2: [
             {
                 xtype: 'pmxDisplayEditField',
                 fieldLabel: gettext('Job ID'),
-- 
2.47.3


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (8 preceding siblings ...)
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
@ 2025-11-11 10:21 ` Christian Ebner
  2025-11-11 10:35   ` Nicolas Frey
  2025-11-13  9:33 ` [pbs-devel] superseded: " Nicolas Frey
  10 siblings, 1 reply; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:21 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> This patch series aims to expand on Dominik's series [0] written for
> pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
> thread pool from the verification.
> 
> The number of threads was previously hard-coded, but is now
> configurable via the API and GUI with new properties called
> `{read,verify}-threads`, similarly to tape backups.
> 
> The number of threads should also be configurable through tuning
> options or datastore config (as discussed by Chris & Thomas on list),
> which can be added in a follow up patch series.
> 
> In my local tests I measured the following speed difference:
> verified a single snapshot with ~32 GiB (4x the RAM size) with 4
> cores (just changing the read threads)
> 
> 1 thread:    ~440MiB/s
> 2 threads:   ~780MiB/s
> 4 threads:   ~1140MiB/s
> 
> [0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t
> 
> Changes since v2:
> * split move to parallel handler into 2 seperate commits
> * add constructor to `IndexVerifyState` and clean up nits
> 
> Changes since v1, thanks to Chris:
> * define dedicated schema for {worker,read,verify}-threads
> * rebase proxmox-backup
> * introduce new state struct `IndexVerifyState` to reduce the amount
>    of Arc clones and overall better bundling
> * adjust update endpoint and UI to the new properties
Series is looking good already, only a few smaller thing need some 
adaption (see individual patches).

One major thing which should be however included is a short 
documentation of these parameters and what they do.

Tested the configs are set as expected via cli/ui, default values are 
cleared, the logs expose the number of used reader/verify threads, 
parameters are actually used for the verification job.

Consider the whole series:

Tested-by: Christian Ebner <c.ebner@proxmox.com>


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey

comments inline

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Add seperate schemas for read and verify thread count to use in
> chunk verification. Also add worker-threads for use in
> `TapeBackupJobSetup`.
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   pbs-api-types/src/datastore.rs | 14 ++++++++++++++
>   1 file changed, 14 insertions(+)
> 
> diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
> index a214ac25..3dd9ce75 100644
> --- a/pbs-api-types/src/datastore.rs
> +++ b/pbs-api-types/src/datastore.rs
> @@ -164,6 +164,20 @@ pub const PRUNE_SCHEMA_KEEP_YEARLY: Schema =
>           .minimum(1)
>           .schema();
> 
> +const fn threads_schema(default: i64) -> Schema {
> +    IntegerSchema::new("The number of threads to use for the job.")
> +        .minimum(1)
> +        .maximum(32)
> +        .default(default)
> +        .schema()

Failed to mention this in v2 of the patches, but these are all schemas 
related to the job config, not the datastore so they should be defined 
in pbs-api-types/src/jobs.rs instead.

Further, in order to have a less generic description for resulting 
schemas, thread_schema() might get a `description: &'static str` as 
second parameter, so this can be set accordingly.

> +pub const WORKER_THREADS_SCHEMA: Schema = threads_schema(1);


This is named a bit generic, might be more telling if prefix by `TAPE_JOB_`.

> +
> +pub const READ_THREADS_SCHEMA: Schema = threads_schema(1);

Same for above, but maybe `VERIFICIATION_` or `VERIFY_JOB_` as prefix ...

> +
> +pub const VERIFY_THREADS_SCHEMA: Schema = threads_schema(4);

... and above as well.

> +
>   /// Base directory where datastores are mounted
>   pub const DATASTORE_MOUNT_DIR: &str = "/mnt/datastore";
>   


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey


Reviewed-by: Christian Ebner <c.ebner@proxmox.com>

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> controls the number of parallel threads to read/verify chunks
> similarly to tape backup job `worker-threads`
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   pbs-api-types/src/jobs.rs | 14 ++++++++++++++
>   1 file changed, 14 insertions(+)
> 
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index 4dbbef2b..2b9cef38 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -203,6 +203,14 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema =
>               optional: true,
>               schema: crate::NS_MAX_DEPTH_SCHEMA,
>           },
> +        "read-threads": {
> +            schema: crate::READ_THREADS_SCHEMA,
> +            optional: true,
> +        },
> +        "verify-threads": {
> +            schema: crate::VERIFY_THREADS_SCHEMA,
> +            optional: true,
> +        },
>       }
>   )]
>   #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)]
> @@ -233,6 +241,12 @@ pub struct VerificationJobConfig {
>       /// how deep the verify should go from the `ns` level downwards. Passing 0 verifies only the
>       /// snapshots on the same level as the passed `ns`, or the datastore root if none.
>       pub max_depth: Option<usize>,
> +    /// The number of read threads to use for the job.
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    pub read_threads: Option<usize>,
> +    /// The number of verification threads to use for the job.
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    pub verify_threads: Option<usize>,
>   }
>   
>   impl VerificationJobConfig {



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey


Reviewed-by: Christian Ebner <c.ebner@proxmox.com>

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> use new worker-threads schema
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   pbs-api-types/src/jobs.rs | 6 +-----
>   1 file changed, 1 insertion(+), 5 deletions(-)
> 
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index 2b9cef38..ba2af6d7 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -321,11 +321,8 @@ pub struct VerificationJobStatus {
>               optional: true,
>           },
>           "worker-threads": {
> -            type: Integer,
> +            schema: crate::WORKER_THREADS_SCHEMA,
>               optional: true,
> -            minimum: 1,
> -            maximum: 32,
> -            default: 1,
>           },
>       }
>   )]
> @@ -353,7 +350,6 @@ pub struct TapeBackupJobSetup {
>       pub ns: Option<BackupNamespace>,
>       #[serde(skip_serializing_if = "Option::is_none", default)]
>       pub max_depth: Option<usize>,
> -    /// Set the number of worker threads to use for the job
>       #[serde(skip_serializing_if = "Option::is_none")]
>       pub worker_threads: Option<u64>,
>   }



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> This way, the chunks will be loaded in parallel in addition to being
> checked in parallel.
> 
> Depending on the underlying storage, this can speed up reading chunks
> from disk, especially when the underlying storage is IO depth
> dependent, and the CPU is faster than the storage.
> 
> Originally-by: Dominik Csapak <d.csapak@proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---

I think this patch diff could be further reduced and split by:
- placing the following patch introducing the struct first
- do the refactoring from member method to associated function for 
verify_chunk_by_backend() and add_corrupt_chunk()
- only then introduce the parallel chunk reading on top of that

Or was your intention to stay close to the original patch here?

One further comment inline.

>   src/backup/verify.rs | 120 +++++++++++++++++++++++++++----------------
>   1 file changed, 75 insertions(+), 45 deletions(-)
> 
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 31c03891..c0ff15d4 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -1,6 +1,6 @@
>   use pbs_config::BackupLockGuard;
>   use std::collections::HashSet;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
>   use std::sync::{Arc, Mutex};
>   use std::time::Instant;
>   
> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
>   use pbs_datastore::manifest::{BackupManifest, FileInfo};
>   use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
>   
> -use crate::tools::parallel_handler::ParallelHandler;
> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
>   
>   use crate::backup::hierarchy::ListAccessibleBackupGroups;
>   
> @@ -85,23 +85,20 @@ impl VerifyWorker {
>   
>           let start_time = Instant::now();
>   
> -        let mut read_bytes = 0;
> -        let mut decoded_bytes = 0;
> +        let read_bytes = Arc::new(AtomicU64::new(0));
> +        let decoded_bytes = Arc::new(AtomicU64::new(0));
>   
> -        let datastore2 = Arc::clone(&self.datastore);
> -        let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
> -        let verified_chunks2 = Arc::clone(&self.verified_chunks);
> -        let errors2 = Arc::clone(&errors);
> -
> -        let decoder_pool = ParallelHandler::new(
> -            "verify chunk decoder",
> -            4,
> +        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> +            let datastore = Arc::clone(&self.datastore);
> +            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> +            let verified_chunks = Arc::clone(&self.verified_chunks);
> +            let errors = Arc::clone(&errors);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
>                       Err(err) => {
> -                        corrupt_chunks2.lock().unwrap().insert(digest);
> +                        corrupt_chunks.lock().unwrap().insert(digest);
>                           info!("can't verify chunk, unknown CryptMode - {err}");
> -                        errors2.fetch_add(1, Ordering::SeqCst);
> +                        errors.fetch_add(1, Ordering::SeqCst);
>                           return Ok(());
>                       }
>                       Ok(mode) => mode,
> @@ -111,25 +108,25 @@ impl VerifyWorker {
>                       info!(
>                       "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
>                   );
> -                    errors2.fetch_add(1, Ordering::SeqCst);
> +                    errors.fetch_add(1, Ordering::SeqCst);
>                   }
>   
>                   if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> -                    corrupt_chunks2.lock().unwrap().insert(digest);
> +                    corrupt_chunks.lock().unwrap().insert(digest);
>                       info!("{err}");
> -                    errors2.fetch_add(1, Ordering::SeqCst);
> -                    match datastore2.rename_corrupt_chunk(&digest) {
> +                    errors.fetch_add(1, Ordering::SeqCst);
> +                    match datastore.rename_corrupt_chunk(&digest) {
>                           Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>                           Err(err) => info!("{err}"),
>                           _ => (),
>                       }
>                   } else {
> -                    verified_chunks2.lock().unwrap().insert(digest);
> +                    verified_chunks.lock().unwrap().insert(digest);
>                   }
>   
>                   Ok(())
> -            },
> -        );
> +            }
> +        });
>   
>           let skip_chunk = |digest: &[u8; 32]| -> bool {
>               if self.verified_chunks.lock().unwrap().contains(digest) {
> @@ -156,6 +153,29 @@ impl VerifyWorker {
>               .datastore
>               .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>   
> +        let reader_pool = ParallelHandler::new("read chunks", 4, {

nit: This sets the default for the reader threads to 4, in the 
subsequent patches that is then however changed to the default of 1 as 
defined in the api schemas, so while not an issue, this should already 
be set to that default value IMO.

> +            let decoder_pool = decoder_pool.channel();
> +            let datastore = Arc::clone(&self.datastore);
> +            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> +            let read_bytes = Arc::clone(&read_bytes);
> +            let decoded_bytes = Arc::clone(&decoded_bytes);
> +            let errors = Arc::clone(&errors);
> +            let backend = self.backend.clone();
> +
> +            move |info: ChunkReadInfo| {
> +                Self::verify_chunk_by_backend(
> +                    &backend,
> +                    Arc::clone(&datastore),
> +                    Arc::clone(&corrupt_chunks),
> +                    Arc::clone(&read_bytes),
> +                    Arc::clone(&decoded_bytes),
> +                    Arc::clone(&errors),
> +                    &decoder_pool,
> +                    &info,
> +                )
> +            }
> +        });
> +
>           for (pos, _) in chunk_list {
>               self.worker.check_abort()?;
>               self.worker.fail_on_shutdown()?;
> @@ -167,19 +187,16 @@ impl VerifyWorker {
>                   continue; // already verified or marked corrupt
>               }
>   
> -            self.verify_chunk_by_backend(
> -                &info,
> -                &mut read_bytes,
> -                &mut decoded_bytes,
> -                Arc::clone(&errors),
> -                &decoder_pool,
> -            )?;
> +            reader_pool.send(info)?;
>           }
>   
> -        decoder_pool.complete()?;
> +        reader_pool.complete()?;
>   
>           let elapsed = start_time.elapsed().as_secs_f64();
>   
> +        let read_bytes = read_bytes.load(Ordering::SeqCst);
> +        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> +
>           let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>           let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
>   
> @@ -199,26 +216,31 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> +    #[allow(clippy::too_many_arguments)]
>       fn verify_chunk_by_backend(
> -        &self,
> -        info: &ChunkReadInfo,
> -        read_bytes: &mut u64,
> -        decoded_bytes: &mut u64,
> +        backend: &DatastoreBackend,
> +        datastore: Arc<DataStore>,
> +        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +        read_bytes: Arc<AtomicU64>,
> +        decoded_bytes: Arc<AtomicU64>,
>           errors: Arc<AtomicUsize>,
> -        decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
> +        decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
> +        info: &ChunkReadInfo,
>       ) -> Result<(), Error> {
> -        match &self.backend {
> -            DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> -                Err(err) => self.add_corrupt_chunk(
> +        match backend {
> +            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> +                Err(err) => Self::add_corrupt_chunk(
> +                    datastore,
> +                    corrupt_chunks,
>                       info.digest,
>                       errors,
>                       &format!("can't verify chunk, load failed - {err}"),
>                   ),
>                   Ok(chunk) => {
>                       let size = info.size();
> -                    *read_bytes += chunk.raw_size();
> +                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                       decoder_pool.send((chunk, info.digest, size))?;
> -                    *decoded_bytes += size;
> +                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                   }
>               },
>               DatastoreBackend::S3(s3_client) => {
> @@ -235,9 +257,9 @@ impl VerifyWorker {
>                           match chunk_result {
>                               Ok(chunk) => {
>                                   let size = info.size();
> -                                *read_bytes += chunk.raw_size();
> +                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                                   decoder_pool.send((chunk, info.digest, size))?;
> -                                *decoded_bytes += size;
> +                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                               }
>                               Err(err) => {
>                                   errors.fetch_add(1, Ordering::SeqCst);
> @@ -245,7 +267,9 @@ impl VerifyWorker {
>                               }
>                           }
>                       }
> -                    Ok(None) => self.add_corrupt_chunk(
> +                    Ok(None) => Self::add_corrupt_chunk(
> +                        datastore,
> +                        corrupt_chunks,
>                           info.digest,
>                           errors,
>                           &format!(
> @@ -263,13 +287,19 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> -    fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
> +    fn add_corrupt_chunk(
> +        datastore: Arc<DataStore>,
> +        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +        digest: [u8; 32],
> +        errors: Arc<AtomicUsize>,
> +        message: &str,
> +    ) {
>           // Panic on poisoned mutex
> -        let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
> +        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
>           corrupt_chunks.insert(digest);
>           error!(message);
>           errors.fetch_add(1, Ordering::SeqCst);
> -        match self.datastore.rename_corrupt_chunk(&digest) {
> +        match datastore.rename_corrupt_chunk(&digest) {
>               Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>               Err(err) => info!("{err}"),
>               _ => (),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey

as mentioned in the previous patch, IMO this one should be introduced 
before that one, adding the parallel handler only on top of that.

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Introduces a new state struct `IndexVerifyState` so that we only need
> to pass around and clone one `Arc`.
> 
> Suggested-by: Christian Ebner <c.ebner@proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
>   1 file changed, 67 insertions(+), 63 deletions(-)
> 
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index c0ff15d4..9a20c8e1 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -34,6 +34,34 @@ pub struct VerifyWorker {
>       backend: DatastoreBackend,
>   }
>   
> +struct IndexVerifyState {
> +    read_bytes: AtomicU64,
> +    decoded_bytes: AtomicU64,
> +    errors: AtomicUsize,
> +    datastore: Arc<DataStore>,
> +    corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> +    start_time: Instant,
> +}
> +
> +impl IndexVerifyState {
> +    fn new(
> +        datastore: &Arc<DataStore>,
> +        corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> +        verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> +    ) -> Self {
> +        Self {
> +            read_bytes: AtomicU64::new(0),
> +            decoded_bytes: AtomicU64::new(0),
> +            errors: AtomicUsize::new(0),
> +            datastore: Arc::clone(datastore),
> +            corrupt_chunks: Arc::clone(corrupt_chunks),
> +            verified_chunks: Arc::clone(verified_chunks),
> +            start_time: Instant::now(),
> +        }
> +    }
> +}
> +
>   impl VerifyWorker {
>       /// Creates a new VerifyWorker for a given task worker and datastore.
>       pub fn new(
> @@ -81,24 +109,20 @@ impl VerifyWorker {
>           index: Box<dyn IndexFile + Send>,
>           crypt_mode: CryptMode,
>       ) -> Result<(), Error> {
> -        let errors = Arc::new(AtomicUsize::new(0));
> -
> -        let start_time = Instant::now();
> -
> -        let read_bytes = Arc::new(AtomicU64::new(0));
> -        let decoded_bytes = Arc::new(AtomicU64::new(0));
> +        let verify_state = Arc::new(IndexVerifyState::new(
> +            &self.datastore,
> +            &self.corrupt_chunks,
> +            &self.verified_chunks,
> +        ));
>   
>           let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> -            let datastore = Arc::clone(&self.datastore);
> -            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> -            let verified_chunks = Arc::clone(&self.verified_chunks);
> -            let errors = Arc::clone(&errors);
> +            let verify_state = Arc::clone(&verify_state);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
>                       Err(err) => {
> -                        corrupt_chunks.lock().unwrap().insert(digest);
> +                        verify_state.corrupt_chunks.lock().unwrap().insert(digest);
>                           info!("can't verify chunk, unknown CryptMode - {err}");
> -                        errors.fetch_add(1, Ordering::SeqCst);
> +                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                           return Ok(());
>                       }
>                       Ok(mode) => mode,
> @@ -108,20 +132,20 @@ impl VerifyWorker {
>                       info!(
>                       "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
>                   );
> -                    errors.fetch_add(1, Ordering::SeqCst);
> +                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                   }
>   
>                   if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> -                    corrupt_chunks.lock().unwrap().insert(digest);
> +                    verify_state.corrupt_chunks.lock().unwrap().insert(digest);
>                       info!("{err}");
> -                    errors.fetch_add(1, Ordering::SeqCst);
> -                    match datastore.rename_corrupt_chunk(&digest) {
> +                    verify_state.errors.fetch_add(1, Ordering::SeqCst);
> +                    match verify_state.datastore.rename_corrupt_chunk(&digest) {
>                           Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>                           Err(err) => info!("{err}"),
>                           _ => (),
>                       }
>                   } else {
> -                    verified_chunks.lock().unwrap().insert(digest);
> +                    verify_state.verified_chunks.lock().unwrap().insert(digest);
>                   }
>   
>                   Ok(())
> @@ -134,7 +158,7 @@ impl VerifyWorker {
>               } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
>                   let digest_str = hex::encode(digest);
>                   info!("chunk {digest_str} was marked as corrupt");
> -                errors.fetch_add(1, Ordering::SeqCst);
> +                verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                   true
>               } else {
>                   false
> @@ -155,27 +179,18 @@ impl VerifyWorker {
>   
>           let reader_pool = ParallelHandler::new("read chunks", 4, {
>               let decoder_pool = decoder_pool.channel();
> -            let datastore = Arc::clone(&self.datastore);
> -            let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> -            let read_bytes = Arc::clone(&read_bytes);
> -            let decoded_bytes = Arc::clone(&decoded_bytes);
> -            let errors = Arc::clone(&errors);
> +            let verify_state = Arc::clone(&verify_state);
>               let backend = self.backend.clone();
>   
>               move |info: ChunkReadInfo| {
>                   Self::verify_chunk_by_backend(
>                       &backend,
> -                    Arc::clone(&datastore),
> -                    Arc::clone(&corrupt_chunks),
> -                    Arc::clone(&read_bytes),
> -                    Arc::clone(&decoded_bytes),
> -                    Arc::clone(&errors),
> +                    Arc::clone(&verify_state),
>                       &decoder_pool,
>                       &info,
>                   )
>               }
>           });
> -
>           for (pos, _) in chunk_list {
>               self.worker.check_abort()?;
>               self.worker.fail_on_shutdown()?;
> @@ -192,10 +207,10 @@ impl VerifyWorker {
>   
>           reader_pool.complete()?;
>   
> -        let elapsed = start_time.elapsed().as_secs_f64();
> +        let elapsed = verify_state.start_time.elapsed().as_secs_f64();
>   
> -        let read_bytes = read_bytes.load(Ordering::SeqCst);
> -        let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> +        let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
> +        let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
>   
>           let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>           let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
> @@ -203,44 +218,39 @@ impl VerifyWorker {
>           let read_speed = read_bytes_mib / elapsed;
>           let decode_speed = decoded_bytes_mib / elapsed;
>   
> -        let error_count = errors.load(Ordering::SeqCst);
> +        let error_count = verify_state.errors.load(Ordering::SeqCst);
>   
>           info!(
>               "  verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
>           );
>   
> -        if errors.load(Ordering::SeqCst) > 0 {
> +        if verify_state.errors.load(Ordering::SeqCst) > 0 {
>               bail!("chunks could not be verified");
>           }
>   
>           Ok(())
>       }
>   
> -    #[allow(clippy::too_many_arguments)]
>       fn verify_chunk_by_backend(
>           backend: &DatastoreBackend,
> -        datastore: Arc<DataStore>,
> -        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> -        read_bytes: Arc<AtomicU64>,
> -        decoded_bytes: Arc<AtomicU64>,
> -        errors: Arc<AtomicUsize>,
> +        verify_state: Arc<IndexVerifyState>,
>           decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
>           info: &ChunkReadInfo,
>       ) -> Result<(), Error> {
>           match backend {
> -            DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> +            DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
>                   Err(err) => Self::add_corrupt_chunk(
> -                    datastore,
> -                    corrupt_chunks,
> +                    verify_state,
>                       info.digest,
> -                    errors,
>                       &format!("can't verify chunk, load failed - {err}"),
>                   ),
>                   Ok(chunk) => {
>                       let size = info.size();
> -                    read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                    verify_state
> +                        .read_bytes
> +                        .fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                       decoder_pool.send((chunk, info.digest, size))?;
> -                    decoded_bytes.fetch_add(size, Ordering::SeqCst);
> +                    verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                   }
>               },
>               DatastoreBackend::S3(s3_client) => {
> @@ -257,28 +267,28 @@ impl VerifyWorker {
>                           match chunk_result {
>                               Ok(chunk) => {
>                                   let size = info.size();
> -                                read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                                verify_state
> +                                    .read_bytes
> +                                    .fetch_add(chunk.raw_size(), Ordering::SeqCst);
>                                   decoder_pool.send((chunk, info.digest, size))?;
> -                                decoded_bytes.fetch_add(size, Ordering::SeqCst);
> +                                verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
>                               }
>                               Err(err) => {
> -                                errors.fetch_add(1, Ordering::SeqCst);
> +                                verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                                   error!("can't verify chunk, load failed - {err}");
>                               }
>                           }
>                       }
>                       Ok(None) => Self::add_corrupt_chunk(
> -                        datastore,
> -                        corrupt_chunks,
> +                        verify_state,
>                           info.digest,
> -                        errors,
>                           &format!(
>                               "can't verify missing chunk with digest {}",
>                               hex::encode(info.digest)
>                           ),
>                       ),
>                       Err(err) => {
> -                        errors.fetch_add(1, Ordering::SeqCst);
> +                        verify_state.errors.fetch_add(1, Ordering::SeqCst);
>                           error!("can't verify chunk, load failed - {err}");
>                       }
>                   }
> @@ -287,19 +297,13 @@ impl VerifyWorker {
>           Ok(())
>       }
>   
> -    fn add_corrupt_chunk(
> -        datastore: Arc<DataStore>,
> -        corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> -        digest: [u8; 32],
> -        errors: Arc<AtomicUsize>,
> -        message: &str,
> -    ) {
> +    fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
>           // Panic on poisoned mutex
> -        let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
> +        let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
>           corrupt_chunks.insert(digest);
>           error!(message);
> -        errors.fetch_add(1, Ordering::SeqCst);
> -        match datastore.rename_corrupt_chunk(&digest) {
> +        verify_state.errors.fetch_add(1, Ordering::SeqCst);
> +        match verify_state.datastore.rename_corrupt_chunk(&digest) {
>               Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
>               Err(err) => info!("{err}"),
>               _ => (),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey


Reviewed-by: Christian Ebner <c.ebner@proxmox.com>

could be placed upfront to be applied independently of the rest of the 
series.

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   src/api2/admin/datastore.rs | 2 +-
>   1 file changed, 1 insertion(+), 1 deletion(-)
> 
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 5e4dd3fc..fde4c247 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -697,7 +697,7 @@ pub async fn status(
>   )]
>   /// Verify backups.
>   ///
> -/// This function can verify a single backup snapshot, all backup from a backup group,
> +/// This function can verify a single backup snapshot, all backups from a backup group,
>   /// or all backups in the datastore.
>   #[allow(clippy::too_many_arguments)]
>   pub fn verify(



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey


Reviewed-by: Christian Ebner <c.ebner@proxmox.com>

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Exposes the {read,verify}-threads property in the `VerifyJobEdit`
> window and `VerifyAll` window so the user can update the values
> accordingly.
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   www/window/VerifyAll.js     | 18 ++++++++++++++++++
>   www/window/VerifyJobEdit.js | 24 ++++++++++++++++++++++++
>   2 files changed, 42 insertions(+)
> 
> diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js
> index 01bcd63d..4239c215 100644
> --- a/www/window/VerifyAll.js
> +++ b/www/window/VerifyAll.js
> @@ -80,6 +80,24 @@ Ext.define('PBS.window.VerifyAll', {
>                           },
>                       ],
>                   },
> +                {
> +                    xtype: 'proxmoxintegerfield',
> +                    name: 'read-threads',
> +                    fieldLabel: gettext('# of Read Threads'),
> +                    emptyText: '1',
> +                    skipEmptyText: true,
> +                    minValue: 1,
> +                    maxValue: 32,
> +                },
> +                {
> +                    xtype: 'proxmoxintegerfield',
> +                    name: 'verify-threads',
> +                    fieldLabel: gettext('# of Verify Threads'),
> +                    emptyText: '4',
> +                    skipEmptyText: true,
> +                    minValue: 1,
> +                    maxValue: 32,
> +                },
>               ],
>           },
>       ],
> diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js
> index e87ca069..5650ed5c 100644
> --- a/www/window/VerifyJobEdit.js
> +++ b/www/window/VerifyJobEdit.js
> @@ -154,6 +154,30 @@ Ext.define('PBS.window.VerifyJobEdit', {
>               },
>           ],
>           advancedColumn1: [
> +            {
> +                xtype: 'proxmoxintegerfield',
> +                name: 'read-threads',
> +                fieldLabel: gettext('# of Read Threads'),
> +                emptyText: '1',
> +                minValue: 1,
> +                maxValue: 32,
> +                cbind: {
> +                    deleteEmpty: '{!isCreate}',
> +                },
> +            },
> +            {
> +                xtype: 'proxmoxintegerfield',
> +                name: 'verify-threads',
> +                fieldLabel: gettext('# of Verify Threads'),
> +                emptyText: '4',
> +                minValue: 1,
> +                maxValue: 32,
> +                cbind: {
> +                    deleteEmpty: '{!isCreate}',
> +                },
> +            },
> +        ],
> +        advancedColumn2: [
>               {
>                   xtype: 'pmxDisplayEditField',
>                   fieldLabel: gettext('Job ID'),



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey


Reviewed-by: Christian Ebner <c.ebner@proxmox.com>

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Allows to add, update or delete the {read,verify}-thread property in
> the verify job config via the API.
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   src/api2/config/verify.rs | 16 ++++++++++++++++
>   1 file changed, 16 insertions(+)
> 
> diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs
> index e71e0c2e..a88a3c32 100644
> --- a/src/api2/config/verify.rs
> +++ b/src/api2/config/verify.rs
> @@ -149,6 +149,10 @@ pub enum DeletableProperty {
>       Ns,
>       /// Delete max-depth property, defaulting to full recursion again
>       MaxDepth,
> +    /// Delete read-threads property
> +    ReadThreads,
> +    /// Delete verify-threads property
> +    VerifyThreads,
>   }
>   
>   #[api(
> @@ -229,6 +233,12 @@ pub fn update_verification_job(
>                   DeletableProperty::MaxDepth => {
>                       data.max_depth = None;
>                   }
> +                DeletableProperty::ReadThreads => {
> +                    data.read_threads = None;
> +                }
> +                DeletableProperty::VerifyThreads => {
> +                    data.verify_threads = None;
> +                }
>               }
>           }
>       }
> @@ -266,6 +276,12 @@ pub fn update_verification_job(
>               data.max_depth = Some(max_depth);
>           }
>       }
> +    if update.read_threads.is_some() {
> +        data.read_threads = update.read_threads;
> +    }
> +    if update.verify_threads.is_some() {
> +        data.verify_threads = update.verify_threads;
> +    }
>   
>       // check new store and NS
>       user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?;



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads
  2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
@ 2025-11-11 10:22   ` Christian Ebner
  0 siblings, 0 replies; 22+ messages in thread
From: Christian Ebner @ 2025-11-11 10:22 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Nicolas Frey

two comments inline

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> use previously introduced {read,verify}-threads in API, where default
> values match the ones of the schema definition.
> 
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
>   src/api2/admin/datastore.rs    | 16 ++++++++++++++--
>   src/api2/backup/environment.rs |  2 +-
>   src/backup/verify.rs           | 16 ++++++++++++++--
>   src/server/verify_job.rs       |  7 ++++++-
>   4 files changed, 35 insertions(+), 6 deletions(-)
> 
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index 6e269ef9..5e4dd3fc 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -45,7 +45,8 @@ use pbs_api_types::{
>       BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
>       IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
>       PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> -    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
> +    PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, READ_THREADS_SCHEMA, UPID, UPID_SCHEMA,
> +    VERIFICATION_OUTDATED_AFTER_SCHEMA, VERIFY_THREADS_SCHEMA,
>   };
>   use pbs_client::pxar::{create_tar, create_zip};
>   use pbs_config::CachedUserInfo;
> @@ -675,6 +676,14 @@ pub async fn status(
>                   schema: NS_MAX_DEPTH_SCHEMA,
>                   optional: true,
>               },
> +            "read-threads": {
> +                schema: READ_THREADS_SCHEMA,
> +                optional: true,
> +            },
> +            "verify-threads": {
> +                schema: VERIFY_THREADS_SCHEMA,
> +                optional: true,
> +            },

There is one consumer for this api endpoint, which still would be nice 
to get this parameters as well. That is the cli invocation by 
`proxmox-backup-manager verify`, defined in 
`src/bin/proxmox-backup-manager.rs`

That should be done as dedicated patch after this one though.

>           },
>       },
>       returns: {
> @@ -700,6 +709,8 @@ pub fn verify(
>       ignore_verified: Option<bool>,
>       outdated_after: Option<i64>,
>       max_depth: Option<usize>,
> +    read_threads: Option<usize>,
> +    verify_threads: Option<usize>,
>       rpcenv: &mut dyn RpcEnvironment,
>   ) -> Result<Value, Error> {
>       let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> @@ -779,7 +790,8 @@ pub fn verify(
>           auth_id.to_string(),
>           to_stdout,
>           move |worker| {
> -            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +            let verify_worker =
> +                VerifyWorker::new(worker.clone(), datastore, read_threads, verify_threads)?;
>               let failed_dirs = if let Some(backup_dir) = backup_dir {
>                   let mut res = Vec::new();
>                   if !verify_worker.verify_backup_dir(
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 0faf6c8e..06696c78 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -795,7 +795,7 @@ impl BackupEnvironment {
>               move |worker| {
>                   worker.log_message("Automatically verifying newly added snapshot");
>   
> -                let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +                let verify_worker = VerifyWorker::new(worker.clone(), datastore, None, None)?;

this now uses always the defaults, which is fine IMO. I would however 
suggest to add a fixme if we would like to extend the series to further 
allow to set per-datastore read/verify settings, as this would then need 
to get these values.

>                   if !verify_worker.verify_backup_dir_with_lock(
>                       &backup_dir,
>                       worker.upid().clone(),
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 9a20c8e1..8a530159 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -32,6 +32,8 @@ pub struct VerifyWorker {
>       verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>       corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>       backend: DatastoreBackend,
> +    read_threads: usize,
> +    verify_threads: usize,
>   }
>   
>   struct IndexVerifyState {
> @@ -67,6 +69,8 @@ impl VerifyWorker {
>       pub fn new(
>           worker: Arc<dyn WorkerTaskContext>,
>           datastore: Arc<DataStore>,
> +        read_threads: Option<usize>,
> +        verify_threads: Option<usize>,
>       ) -> Result<Self, Error> {
>           let backend = datastore.backend()?;
>           Ok(Self {
> @@ -77,6 +81,8 @@ impl VerifyWorker {
>               // start with 64 chunks since we assume there are few corrupt ones
>               corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
>               backend,
> +            read_threads: read_threads.unwrap_or(1),
> +            verify_threads: verify_threads.unwrap_or(4),
>           })
>       }
>   
> @@ -115,7 +121,7 @@ impl VerifyWorker {
>               &self.verified_chunks,
>           ));
>   
> -        let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> +        let decoder_pool = ParallelHandler::new("verify chunk decoder", self.verify_threads, {
>               let verify_state = Arc::clone(&verify_state);
>               move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>                   let chunk_crypt_mode = match chunk.crypt_mode() {
> @@ -177,7 +183,7 @@ impl VerifyWorker {
>               .datastore
>               .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>   
> -        let reader_pool = ParallelHandler::new("read chunks", 4, {
> +        let reader_pool = ParallelHandler::new("read chunks", self.read_threads, {
>               let decoder_pool = decoder_pool.channel();
>               let verify_state = Arc::clone(&verify_state);
>               let backend = self.backend.clone();
> @@ -578,6 +584,12 @@ impl VerifyWorker {
>           let group_count = list.len();
>           info!("found {group_count} groups");
>   
> +        log::info!(
> +            "using {} read and {} verify thread(s)",
> +            self.read_threads,
> +            self.verify_threads,
> +        );
> +
>           let mut progress = StoreProgress::new(group_count as u64);
>   
>           for (pos, group) in list.into_iter().enumerate() {
> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
> index c8792174..e0b03155 100644
> --- a/src/server/verify_job.rs
> +++ b/src/server/verify_job.rs
> @@ -41,7 +41,12 @@ pub fn do_verification_job(
>                   None => Default::default(),
>               };
>   
> -            let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> +            let verify_worker = VerifyWorker::new(
> +                worker.clone(),
> +                datastore,
> +                verification_job.read_threads,
> +                verification_job.verify_threads,
> +            )?;
>               let result = verify_worker.verify_all_backups(
>                   worker.upid(),
>                   ns,



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

* Re: [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification
  2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
@ 2025-11-11 10:35   ` Nicolas Frey
  0 siblings, 0 replies; 22+ messages in thread
From: Nicolas Frey @ 2025-11-11 10:35 UTC (permalink / raw)
  To: Christian Ebner, Proxmox Backup Server development discussion

On 11/11/25 11:21 AM, Christian Ebner wrote:
> On 11/10/25 9:44 AM, Nicolas Frey wrote:
>> This patch series aims to expand on Dominik's series [0] written for
>> pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
>> thread pool from the verification.
>>
>> The number of threads was previously hard-coded, but is now
>> configurable via the API and GUI with new properties called
>> `{read,verify}-threads`, similarly to tape backups.
>>
>> The number of threads should also be configurable through tuning
>> options or datastore config (as discussed by Chris & Thomas on list),
>> which can be added in a follow up patch series.
>>
>> In my local tests I measured the following speed difference:
>> verified a single snapshot with ~32 GiB (4x the RAM size) with 4
>> cores (just changing the read threads)
>>
>> 1 thread:    ~440MiB/s
>> 2 threads:   ~780MiB/s
>> 4 threads:   ~1140MiB/s
>>
>> [0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-
>> d.csapak@proxmox.com/#t
>>
>> Changes since v2:
>> * split move to parallel handler into 2 seperate commits
>> * add constructor to `IndexVerifyState` and clean up nits
>>
>> Changes since v1, thanks to Chris:
>> * define dedicated schema for {worker,read,verify}-threads
>> * rebase proxmox-backup
>> * introduce new state struct `IndexVerifyState` to reduce the amount
>>    of Arc clones and overall better bundling
>> * adjust update endpoint and UI to the new properties
> Series is looking good already, only a few smaller thing need some
> adaption (see individual patches).
> 
> One major thing which should be however included is a short
> documentation of these parameters and what they do.
> 
> Tested the configs are set as expected via cli/ui, default values are
> cleared, the logs expose the number of used reader/verify threads,
> parameters are actually used for the verification job.
> 
> Consider the whole series:
> 
> Tested-by: Christian Ebner <c.ebner@proxmox.com>

Thanks for the Review and Test!

I unfortunately misunderstood your comments on v2 about splitting it
into smaller commits and mixed up the order because of that.

I'll send a v4 with all your suggestions!


_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

^ permalink raw reply	[flat|nested] 22+ messages in thread

* [pbs-devel] superseded:  [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification
  2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
                   ` (9 preceding siblings ...)
  2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
@ 2025-11-13  9:33 ` Nicolas Frey
  10 siblings, 0 replies; 22+ messages in thread
From: Nicolas Frey @ 2025-11-13  9:33 UTC (permalink / raw)
  To: pbs-devel

Superseded-by: https://lore.proxmox.com/pbs-devel/20251113093118.195229-1-n.frey@proxmox.com/T/#t

On 11/10/25 9:44 AM, Nicolas Frey wrote:
> This patch series aims to expand on Dominik's series [0] written for
> pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
> thread pool from the verification.
> 
> The number of threads was previously hard-coded, but is now
> configurable via the API and GUI with new properties called
> `{read,verify}-threads`, similarly to tape backups.
> 
> The number of threads should also be configurable through tuning
> options or datastore config (as discussed by Chris & Thomas on list),
> which can be added in a follow up patch series.
> 
> In my local tests I measured the following speed difference:
> verified a single snapshot with ~32 GiB (4x the RAM size) with 4
> cores (just changing the read threads)
> 
> 1 thread:    ~440MiB/s
> 2 threads:   ~780MiB/s
> 4 threads:   ~1140MiB/s
> 
> [0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t
> 
> Changes since v2:
> * split move to parallel handler into 2 seperate commits
> * add constructor to `IndexVerifyState` and clean up nits
> 
> Changes since v1, thanks to Chris:
> * define dedicated schema for {worker,read,verify}-threads
> * rebase proxmox-backup
> * introduce new state struct `IndexVerifyState` to reduce the amount
>   of Arc clones and overall better bundling
> * adjust update endpoint and UI to the new properties
> 
> proxmox:
> 
> Nicolas Frey (3):
>   pbs-api-types: add schema for {worker,read,verify}-threads
>   pbs-api-types: jobs: add {read,verify}-threads to
>     VerificationJobConfig
>   pbs-api-types: use worker-threads schema for TapeBackupJobSetup
> 
>  pbs-api-types/src/datastore.rs | 14 ++++++++++++++
>  pbs-api-types/src/jobs.rs      | 20 +++++++++++++++-----
>  2 files changed, 29 insertions(+), 5 deletions(-)
> 
> 
> proxmox-backup:
> 
> Nicolas Frey (6):
>   api: verify: move chunk loading into parallel handler
>   api: verify: bundle parameters into new struct
>   api: verify: determine the number of threads to use with
>     {read,verify}-threads
>   api: verify: correct typo in comment
>   api: verify: add {read,verify}-threads to update endpoint
>   ui: verify: add option to set number of threads for job
> 
>  src/api2/admin/datastore.rs    |  18 +++-
>  src/api2/backup/environment.rs |   2 +-
>  src/api2/config/verify.rs      |  16 ++++
>  src/backup/verify.rs           | 164 +++++++++++++++++++++------------
>  src/server/verify_job.rs       |   7 +-
>  www/window/VerifyAll.js        |  18 ++++
>  www/window/VerifyJobEdit.js    |  24 +++++
>  7 files changed, 185 insertions(+), 64 deletions(-)
> 
> 
> Summary over all repositories:
>   9 files changed, 214 insertions(+), 69 deletions(-)
> 



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 22+ messages in thread

end of thread, other threads:[~2025-11-13  9:32 UTC | newest]

Thread overview: 22+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-10  8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-10  8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-11 10:22   ` Christian Ebner
2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
2025-11-11 10:35   ` Nicolas Frey
2025-11-13  9:33 ` [pbs-devel] superseded: " Nicolas Frey

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal