* [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification
@ 2025-11-05 15:51 Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
` (5 more replies)
0 siblings, 6 replies; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
This patch series aims to expand on Dominik's series [0] written for
pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
thread pool from the verification.
The number of threads was previously hard-coded, but is now
configurable via the API and GUI with a new property called
`worker-threads`, similarly to tape backups.
The number of `worker-threads` only controls the thread pool for
reading, though if it makes sense to reuse this for the verification
pool, it could be adjusted to do so too.
In my local tests I measured the following speed difference:
verified a single snapshot with ~32 GiB (4x the RAM size) with 4 cores
1 thread: ~440MiB/s
2 threads: ~780MiB/s
4 threads: ~1140MiB/s
[0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t
proxmox:
Nicolas Frey (1):
pbs-api-types: jobs: verify: add worker-threads to
VerificationJobSetup
pbs-api-types/src/jobs.rs | 10 ++++++++++
1 file changed, 10 insertions(+)
proxmox-backup:
Nicolas Frey (4):
api: verify: move chunk loading into parallel handler
api: verify: use worker-threads to determine the number of threads to
use
api: verify: add worker-threads to update endpoint
ui: verify: add option to set number of threads for job
src/api2/admin/datastore.rs | 13 +++-
src/api2/backup/environment.rs | 2 +-
src/api2/config/verify.rs | 8 +++
src/backup/verify.rs | 123 +++++++++++++++++++++------------
src/server/verify_job.rs | 3 +-
www/window/VerifyAll.js | 12 ++++
www/window/VerifyJobEdit.js | 13 ++++
7 files changed, 125 insertions(+), 49 deletions(-)
Summary over all repositories:
8 files changed, 135 insertions(+), 49 deletions(-)
--
Generated by git-murpp 0.8.1
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread* [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey @ 2025-11-05 15:51 ` Nicolas Frey 2025-11-06 8:14 ` Christian Ebner 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey ` (4 subsequent siblings) 5 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw) To: pbs-devel Signed-off-by: Nicolas Frey <n.frey@proxmox.com> --- pbs-api-types/src/jobs.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs index 4dbbef2b..d904f797 100644 --- a/pbs-api-types/src/jobs.rs +++ b/pbs-api-types/src/jobs.rs @@ -203,6 +203,13 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema = optional: true, schema: crate::NS_MAX_DEPTH_SCHEMA, }, + "worker-threads": { + type: Integer, + optional: true, + minimum: 1, + maximum: 32, + default: 1, + }, } )] #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)] @@ -221,6 +228,9 @@ pub struct VerificationJobConfig { #[serde(skip_serializing_if = "Option::is_none")] /// Reverify snapshots after X days, never if 0. Ignored if 'ignore_verified' is false. pub outdated_after: Option<i64>, + /// Set the number of worker threads to use for the job + #[serde(skip_serializing_if = "Option::is_none")] + pub worker_threads: Option<usize>, #[serde(skip_serializing_if = "Option::is_none")] pub comment: Option<String>, #[serde(skip_serializing_if = "Option::is_none")] -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey @ 2025-11-06 8:14 ` Christian Ebner 0 siblings, 0 replies; 19+ messages in thread From: Christian Ebner @ 2025-11-06 8:14 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Nicolas Frey Please mention in a short commit message what this new parameter will be used for, e.g. controls the number of parallel workers to read and verify chunks. On 11/5/25 4:51 PM, Nicolas Frey wrote: > Signed-off-by: Nicolas Frey <n.frey@proxmox.com> > --- > pbs-api-types/src/jobs.rs | 10 ++++++++++ > 1 file changed, 10 insertions(+) > > diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs > index 4dbbef2b..d904f797 100644 > --- a/pbs-api-types/src/jobs.rs > +++ b/pbs-api-types/src/jobs.rs > @@ -203,6 +203,13 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema = > optional: true, > schema: crate::NS_MAX_DEPTH_SCHEMA, > }, > + "worker-threads": { > + type: Integer, > + optional: true, > + minimum: 1, > + maximum: 32, > + default: 1, > + }, Please define this as dedicated schema, so it can be reused for the PBS api as well (as exposed in patch 2). > } > )] > #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)] > @@ -221,6 +228,9 @@ pub struct VerificationJobConfig { > #[serde(skip_serializing_if = "Option::is_none")] > /// Reverify snapshots after X days, never if 0. Ignored if 'ignore_verified' is false. > pub outdated_after: Option<i64>, > + /// Set the number of worker threads to use for the job > + #[serde(skip_serializing_if = "Option::is_none")] > + pub worker_threads: Option<usize>, > #[serde(skip_serializing_if = "Option::is_none")] > pub comment: Option<String>, > #[serde(skip_serializing_if = "Option::is_none")] _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey @ 2025-11-05 15:51 ` Nicolas Frey 2025-11-06 8:54 ` Christian Ebner 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey ` (3 subsequent siblings) 5 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw) To: pbs-devel This way, the chunks will be loaded in parallel in addition to being checked in parallel. Depending on the underlying storage, this can speed up reading chunks from disk, especially when the underlying storage is IO depth dependent, and the CPU is faster than the storage. Originally-by: Dominik Csapak <d.csapak@proxmox.com> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> --- src/backup/verify.rs | 120 +++++++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 45 deletions(-) diff --git a/src/backup/verify.rs b/src/backup/verify.rs index e33fdf50..7f91f38c 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -1,6 +1,6 @@ use pbs_config::BackupLockGuard; use std::collections::HashSet; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile}; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress}; -use crate::tools::parallel_handler::ParallelHandler; +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; use crate::backup::hierarchy::ListAccessibleBackupGroups; @@ -156,23 +156,20 @@ impl VerifyWorker { let start_time = Instant::now(); - let mut read_bytes = 0; - let mut decoded_bytes = 0; + let read_bytes = Arc::new(AtomicU64::new(0)); + let decoded_bytes = Arc::new(AtomicU64::new(0)); - let datastore2 = Arc::clone(&self.datastore); - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); - let verified_chunks2 = Arc::clone(&self.verified_chunks); - let errors2 = Arc::clone(&errors); - - let decoder_pool = ParallelHandler::new( - "verify chunk decoder", - 4, + let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { + let datastore = Arc::clone(&self.datastore); + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); + let verified_chunks = Arc::clone(&self.verified_chunks); + let errors = Arc::clone(&errors); move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { - corrupt_chunks2.lock().unwrap().insert(digest); + corrupt_chunks.lock().unwrap().insert(digest); info!("can't verify chunk, unknown CryptMode - {err}"); - errors2.fetch_add(1, Ordering::SeqCst); + errors.fetch_add(1, Ordering::SeqCst); return Ok(()); } Ok(mode) => mode, @@ -182,21 +179,21 @@ impl VerifyWorker { info!( "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" ); - errors2.fetch_add(1, Ordering::SeqCst); + errors.fetch_add(1, Ordering::SeqCst); } if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { - corrupt_chunks2.lock().unwrap().insert(digest); + corrupt_chunks.lock().unwrap().insert(digest); info!("{err}"); - errors2.fetch_add(1, Ordering::SeqCst); - Self::rename_corrupted_chunk(datastore2.clone(), &digest); + errors.fetch_add(1, Ordering::SeqCst); + Self::rename_corrupted_chunk(datastore.clone(), &digest); } else { - verified_chunks2.lock().unwrap().insert(digest); + verified_chunks.lock().unwrap().insert(digest); } Ok(()) - }, - ); + } + }); let skip_chunk = |digest: &[u8; 32]| -> bool { if self.verified_chunks.lock().unwrap().contains(digest) { @@ -223,6 +220,29 @@ impl VerifyWorker { .datastore .get_chunks_in_order(&*index, skip_chunk, check_abort)?; + let reader_pool = ParallelHandler::new("read chunks", 4, { + let decoder_pool = decoder_pool.channel(); + let datastore = Arc::clone(&self.datastore); + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); + let read_bytes = Arc::clone(&read_bytes); + let decoded_bytes = Arc::clone(&decoded_bytes); + let errors = Arc::clone(&errors); + let backend = self.backend.clone(); + + move |info: ChunkReadInfo| { + Self::verify_chunk_by_backend( + &backend, + Arc::clone(&datastore), + Arc::clone(&corrupt_chunks), + Arc::clone(&read_bytes), + Arc::clone(&decoded_bytes), + Arc::clone(&errors), + &decoder_pool, + &info, + ) + } + }); + for (pos, _) in chunk_list { self.worker.check_abort()?; self.worker.fail_on_shutdown()?; @@ -234,19 +254,16 @@ impl VerifyWorker { continue; // already verified or marked corrupt } - self.verify_chunk_by_backend( - &info, - &mut read_bytes, - &mut decoded_bytes, - Arc::clone(&errors), - &decoder_pool, - )?; + reader_pool.send(info)?; } - decoder_pool.complete()?; + reader_pool.complete()?; let elapsed = start_time.elapsed().as_secs_f64(); + let read_bytes = read_bytes.load(Ordering::SeqCst); + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); + let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); @@ -266,26 +283,31 @@ impl VerifyWorker { Ok(()) } + #[allow(clippy::too_many_arguments)] fn verify_chunk_by_backend( - &self, - info: &ChunkReadInfo, - read_bytes: &mut u64, - decoded_bytes: &mut u64, + backend: &DatastoreBackend, + datastore: Arc<DataStore>, + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, + read_bytes: Arc<AtomicU64>, + decoded_bytes: Arc<AtomicU64>, errors: Arc<AtomicUsize>, - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, + info: &ChunkReadInfo, ) -> Result<(), Error> { - match &self.backend { - DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) { - Err(err) => self.add_corrupt_chunk( + match backend { + DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { + Err(err) => Self::add_corrupt_chunk( + datastore, + corrupt_chunks, info.digest, errors, &format!("can't verify chunk, load failed - {err}"), ), Ok(chunk) => { let size = info.size(); - *read_bytes += chunk.raw_size(); + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - *decoded_bytes += size; + decoded_bytes.fetch_add(size, Ordering::SeqCst); } }, DatastoreBackend::S3(s3_client) => { @@ -302,9 +324,9 @@ impl VerifyWorker { match chunk_result { Ok(chunk) => { let size = info.size(); - *read_bytes += chunk.raw_size(); + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - *decoded_bytes += size; + decoded_bytes.fetch_add(size, Ordering::SeqCst); } Err(err) => { errors.fetch_add(1, Ordering::SeqCst); @@ -312,7 +334,9 @@ impl VerifyWorker { } } } - Ok(None) => self.add_corrupt_chunk( + Ok(None) => Self::add_corrupt_chunk( + datastore, + corrupt_chunks, info.digest, errors, &format!( @@ -330,13 +354,19 @@ impl VerifyWorker { Ok(()) } - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) { + fn add_corrupt_chunk( + datastore: Arc<DataStore>, + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, + digest: [u8; 32], + errors: Arc<AtomicUsize>, + message: &str, + ) { // Panic on poisoned mutex - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); + let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); corrupt_chunks.insert(digest); error!(message); errors.fetch_add(1, Ordering::SeqCst); - Self::rename_corrupted_chunk(self.datastore.clone(), &digest); + Self::rename_corrupted_chunk(datastore.clone(), &digest); } fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey @ 2025-11-06 8:54 ` Christian Ebner 2025-11-06 9:04 ` Nicolas Frey 0 siblings, 1 reply; 19+ messages in thread From: Christian Ebner @ 2025-11-06 8:54 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Nicolas Frey On 11/5/25 4:51 PM, Nicolas Frey wrote: > This way, the chunks will be loaded in parallel in addition to being > checked in parallel. > > Depending on the underlying storage, this can speed up reading chunks > from disk, especially when the underlying storage is IO depth > dependent, and the CPU is faster than the storage. > > Originally-by: Dominik Csapak <d.csapak@proxmox.com> > Signed-off-by: Nicolas Frey <n.frey@proxmox.com> > --- > src/backup/verify.rs | 120 +++++++++++++++++++++++++++---------------- > 1 file changed, 75 insertions(+), 45 deletions(-) > > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index e33fdf50..7f91f38c 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -1,6 +1,6 @@ > use pbs_config::BackupLockGuard; > use std::collections::HashSet; > -use std::sync::atomic::{AtomicUsize, Ordering}; > +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::Instant; > > @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile}; > use pbs_datastore::manifest::{BackupManifest, FileInfo}; > use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress}; > > -use crate::tools::parallel_handler::ParallelHandler; > +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; > > use crate::backup::hierarchy::ListAccessibleBackupGroups; > > @@ -156,23 +156,20 @@ impl VerifyWorker { > > let start_time = Instant::now(); > > - let mut read_bytes = 0; > - let mut decoded_bytes = 0; > + let read_bytes = Arc::new(AtomicU64::new(0)); > + let decoded_bytes = Arc::new(AtomicU64::new(0)); > > - let datastore2 = Arc::clone(&self.datastore); > - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); > - let verified_chunks2 = Arc::clone(&self.verified_chunks); > - let errors2 = Arc::clone(&errors); > - > - let decoder_pool = ParallelHandler::new( > - "verify chunk decoder", > - 4, > + let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { > + let datastore = Arc::clone(&self.datastore); > + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > + let verified_chunks = Arc::clone(&self.verified_chunks); > + let errors = Arc::clone(&errors); Hmm, this really warrants an internal state struct for bundling theses and only cloning the arc around that. E.g. I would propose to introduce a struct along the lines of: struct IndexVerifyState { read_bytes: AtomicU64, decoded_bytes: AtomicU64, errors: AtomicUsize, datastore: Arc<DataStore>, corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, start_time: Instant, } Which can then be wrapped in an Arc and also passed along as parameter to the inner methods where required. See the diff below (untested), which could be used for further refining this. > move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { > let chunk_crypt_mode = match chunk.crypt_mode() { > Err(err) => { > - corrupt_chunks2.lock().unwrap().insert(digest); > + corrupt_chunks.lock().unwrap().insert(digest); > info!("can't verify chunk, unknown CryptMode - {err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > + errors.fetch_add(1, Ordering::SeqCst); > return Ok(()); > } > Ok(mode) => mode, > @@ -182,21 +179,21 @@ impl VerifyWorker { > info!( > "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" > ); > - errors2.fetch_add(1, Ordering::SeqCst); > + errors.fetch_add(1, Ordering::SeqCst); > } > > if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { > - corrupt_chunks2.lock().unwrap().insert(digest); > + corrupt_chunks.lock().unwrap().insert(digest); > info!("{err}"); > - errors2.fetch_add(1, Ordering::SeqCst); > - Self::rename_corrupted_chunk(datastore2.clone(), &digest); > + errors.fetch_add(1, Ordering::SeqCst); > + Self::rename_corrupted_chunk(datastore.clone(), &digest); > } else { > - verified_chunks2.lock().unwrap().insert(digest); > + verified_chunks.lock().unwrap().insert(digest); > } > > Ok(()) > - }, > - ); > + } > + }); > > let skip_chunk = |digest: &[u8; 32]| -> bool { > if self.verified_chunks.lock().unwrap().contains(digest) { > @@ -223,6 +220,29 @@ impl VerifyWorker { > .datastore > .get_chunks_in_order(&*index, skip_chunk, check_abort)?; > > + let reader_pool = ParallelHandler::new("read chunks", 4, { > + let decoder_pool = decoder_pool.channel(); > + let datastore = Arc::clone(&self.datastore); > + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > + let read_bytes = Arc::clone(&read_bytes); > + let decoded_bytes = Arc::clone(&decoded_bytes); > + let errors = Arc::clone(&errors); > + let backend = self.backend.clone(); > + > + move |info: ChunkReadInfo| { > + Self::verify_chunk_by_backend( > + &backend, > + Arc::clone(&datastore), > + Arc::clone(&corrupt_chunks), > + Arc::clone(&read_bytes), > + Arc::clone(&decoded_bytes), > + Arc::clone(&errors), > + &decoder_pool, > + &info, > + ) > + } > + }); > + > for (pos, _) in chunk_list { > self.worker.check_abort()?; > self.worker.fail_on_shutdown()?; > @@ -234,19 +254,16 @@ impl VerifyWorker { > continue; // already verified or marked corrupt > } > > - self.verify_chunk_by_backend( > - &info, > - &mut read_bytes, > - &mut decoded_bytes, > - Arc::clone(&errors), > - &decoder_pool, > - )?; > + reader_pool.send(info)?; > } > > - decoder_pool.complete()?; > + reader_pool.complete()?; > > let elapsed = start_time.elapsed().as_secs_f64(); > > + let read_bytes = read_bytes.load(Ordering::SeqCst); > + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); > + > let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); > let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); > > @@ -266,26 +283,31 @@ impl VerifyWorker { > Ok(()) > } > > + #[allow(clippy::too_many_arguments)] > fn verify_chunk_by_backend( > - &self, > - info: &ChunkReadInfo, > - read_bytes: &mut u64, > - decoded_bytes: &mut u64, > + backend: &DatastoreBackend, > + datastore: Arc<DataStore>, > + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > + read_bytes: Arc<AtomicU64>, > + decoded_bytes: Arc<AtomicU64>, > errors: Arc<AtomicUsize>, > - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, > + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, > + info: &ChunkReadInfo, > ) -> Result<(), Error> { > - match &self.backend { > - DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) { > - Err(err) => self.add_corrupt_chunk( > + match backend { > + DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { > + Err(err) => Self::add_corrupt_chunk( > + datastore, > + corrupt_chunks, > info.digest, > errors, > &format!("can't verify chunk, load failed - {err}"), > ), > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > }, > DatastoreBackend::S3(s3_client) => { > @@ -302,9 +324,9 @@ impl VerifyWorker { > match chunk_result { > Ok(chunk) => { > let size = info.size(); > - *read_bytes += chunk.raw_size(); > + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - *decoded_bytes += size; > + decoded_bytes.fetch_add(size, Ordering::SeqCst); > } > Err(err) => { > errors.fetch_add(1, Ordering::SeqCst); > @@ -312,7 +334,9 @@ impl VerifyWorker { > } > } > } > - Ok(None) => self.add_corrupt_chunk( > + Ok(None) => Self::add_corrupt_chunk( > + datastore, > + corrupt_chunks, > info.digest, > errors, > &format!( > @@ -330,13 +354,19 @@ impl VerifyWorker { > Ok(()) > } > > - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) { > + fn add_corrupt_chunk( > + datastore: Arc<DataStore>, > + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > + digest: [u8; 32], > + errors: Arc<AtomicUsize>, > + message: &str, > + ) { > // Panic on poisoned mutex > - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); > + let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); > corrupt_chunks.insert(digest); > error!(message); > errors.fetch_add(1, Ordering::SeqCst); > - Self::rename_corrupted_chunk(self.datastore.clone(), &digest); > + Self::rename_corrupted_chunk(datastore.clone(), &digest); > } > > fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 7f91f38c9..e01405750 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -152,24 +152,24 @@ impl VerifyWorker { index: Box<dyn IndexFile + Send>, crypt_mode: CryptMode, ) -> Result<(), Error> { - let errors = Arc::new(AtomicUsize::new(0)); - - let start_time = Instant::now(); - - let read_bytes = Arc::new(AtomicU64::new(0)); - let decoded_bytes = Arc::new(AtomicU64::new(0)); + let index_verify_state = Arc::new(IndexVerifyState { + read_bytes: AtomicU64::new(0), + decoded_bytes: AtomicU64::new(0), + errors: AtomicUsize::new(0), + datastore: Arc::clone(&self.datastore), + corrupt_chunks: Arc::clone(&self.corrupt_chunks), + start_time: Instant::now(), + }); let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, { - let datastore = Arc::clone(&self.datastore); - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); + let state = Arc::clone(&index_verify_state); let verified_chunks = Arc::clone(&self.verified_chunks); - let errors = Arc::clone(&errors); move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { let chunk_crypt_mode = match chunk.crypt_mode() { Err(err) => { - corrupt_chunks.lock().unwrap().insert(digest); + state.corrupt_chunks.lock().unwrap().insert(digest); info!("can't verify chunk, unknown CryptMode - {err}"); - errors.fetch_add(1, Ordering::SeqCst); + state.errors.fetch_add(1, Ordering::SeqCst); return Ok(()); } Ok(mode) => mode, @@ -179,14 +179,14 @@ impl VerifyWorker { info!( "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}" ); - errors.fetch_add(1, Ordering::SeqCst); + state.errors.fetch_add(1, Ordering::SeqCst); } if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) { - corrupt_chunks.lock().unwrap().insert(digest); + state.corrupt_chunks.lock().unwrap().insert(digest); info!("{err}"); - errors.fetch_add(1, Ordering::SeqCst); - Self::rename_corrupted_chunk(datastore.clone(), &digest); + state.errors.fetch_add(1, Ordering::SeqCst); + Self::rename_corrupted_chunk(state.datastore.clone(), &digest); } else { verified_chunks.lock().unwrap().insert(digest); } @@ -201,7 +201,7 @@ impl VerifyWorker { } else if self.corrupt_chunks.lock().unwrap().contains(digest) { let digest_str = hex::encode(digest); info!("chunk {digest_str} was marked as corrupt"); - errors.fetch_add(1, Ordering::SeqCst); + index_verify_state.errors.fetch_add(1, Ordering::SeqCst); true } else { false @@ -222,24 +222,11 @@ impl VerifyWorker { let reader_pool = ParallelHandler::new("read chunks", 4, { let decoder_pool = decoder_pool.channel(); - let datastore = Arc::clone(&self.datastore); - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); - let read_bytes = Arc::clone(&read_bytes); - let decoded_bytes = Arc::clone(&decoded_bytes); - let errors = Arc::clone(&errors); + let state = Arc::clone(&index_verify_state); let backend = self.backend.clone(); move |info: ChunkReadInfo| { - Self::verify_chunk_by_backend( - &backend, - Arc::clone(&datastore), - Arc::clone(&corrupt_chunks), - Arc::clone(&read_bytes), - Arc::clone(&decoded_bytes), - Arc::clone(&errors), - &decoder_pool, - &info, - ) + Self::verify_chunk_by_backend(&backend, Arc::clone(&state), &decoder_pool, &info) } }); @@ -259,10 +246,10 @@ impl VerifyWorker { reader_pool.complete()?; - let elapsed = start_time.elapsed().as_secs_f64(); + let elapsed = index_verify_state.start_time.elapsed().as_secs_f64(); - let read_bytes = read_bytes.load(Ordering::SeqCst); - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); + let read_bytes = index_verify_state.read_bytes.load(Ordering::SeqCst); + let decoded_bytes = index_verify_state.decoded_bytes.load(Ordering::SeqCst); let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0); @@ -270,13 +257,13 @@ impl VerifyWorker { let read_speed = read_bytes_mib / elapsed; let decode_speed = decoded_bytes_mib / elapsed; - let error_count = errors.load(Ordering::SeqCst); + let error_count = index_verify_state.errors.load(Ordering::SeqCst); info!( " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)" ); - if errors.load(Ordering::SeqCst) > 0 { + if index_verify_state.errors.load(Ordering::SeqCst) > 0 { bail!("chunks could not be verified"); } @@ -286,28 +273,26 @@ impl VerifyWorker { #[allow(clippy::too_many_arguments)] fn verify_chunk_by_backend( backend: &DatastoreBackend, - datastore: Arc<DataStore>, - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, - read_bytes: Arc<AtomicU64>, - decoded_bytes: Arc<AtomicU64>, - errors: Arc<AtomicUsize>, + state: Arc<IndexVerifyState>, decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, info: &ChunkReadInfo, ) -> Result<(), Error> { match backend { - DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) { + DatastoreBackend::Filesystem => match state.datastore.load_chunk(&info.digest) { Err(err) => Self::add_corrupt_chunk( - datastore, - corrupt_chunks, + Arc::clone(&state.datastore), + Arc::clone(&state.corrupt_chunks), info.digest, - errors, + &state.errors, &format!("can't verify chunk, load failed - {err}"), ), Ok(chunk) => { let size = info.size(); - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); + state + .read_bytes + .fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - decoded_bytes.fetch_add(size, Ordering::SeqCst); + state.decoded_bytes.fetch_add(size, Ordering::SeqCst); } }, DatastoreBackend::S3(s3_client) => { @@ -324,28 +309,30 @@ impl VerifyWorker { match chunk_result { Ok(chunk) => { let size = info.size(); - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); + state + .read_bytes + .fetch_add(chunk.raw_size(), Ordering::SeqCst); decoder_pool.send((chunk, info.digest, size))?; - decoded_bytes.fetch_add(size, Ordering::SeqCst); + state.decoded_bytes.fetch_add(size, Ordering::SeqCst); } Err(err) => { - errors.fetch_add(1, Ordering::SeqCst); + state.errors.fetch_add(1, Ordering::SeqCst); error!("can't verify chunk, load failed - {err}"); } } } Ok(None) => Self::add_corrupt_chunk( - datastore, - corrupt_chunks, + Arc::clone(&state.datastore), + Arc::clone(&state.corrupt_chunks), info.digest, - errors, + &state.errors, &format!( "can't verify missing chunk with digest {}", hex::encode(info.digest) ), ), Err(err) => { - errors.fetch_add(1, Ordering::SeqCst); + state.errors.fetch_add(1, Ordering::SeqCst); error!("can't verify chunk, load failed - {err}"); } } @@ -358,7 +345,7 @@ impl VerifyWorker { datastore: Arc<DataStore>, corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, digest: [u8; 32], - errors: Arc<AtomicUsize>, + errors: &AtomicUsize, message: &str, ) { // Panic on poisoned mutex @@ -681,3 +668,12 @@ impl VerifyWorker { } } } + +struct IndexVerifyState { + read_bytes: AtomicU64, + decoded_bytes: AtomicU64, + errors: AtomicUsize, + datastore: Arc<DataStore>, + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, + start_time: Instant, +} _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler 2025-11-06 8:54 ` Christian Ebner @ 2025-11-06 9:04 ` Nicolas Frey 2025-11-06 9:26 ` Christian Ebner 0 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-06 9:04 UTC (permalink / raw) To: Christian Ebner, Proxmox Backup Server development discussion On 11/6/25 9:54 AM, Christian Ebner wrote: > On 11/5/25 4:51 PM, Nicolas Frey wrote: >> This way, the chunks will be loaded in parallel in addition to being >> checked in parallel. >> >> Depending on the underlying storage, this can speed up reading chunks >> from disk, especially when the underlying storage is IO depth >> dependent, and the CPU is faster than the storage. >> >> Originally-by: Dominik Csapak <d.csapak@proxmox.com> >> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >> --- >> src/backup/verify.rs | 120 ++++++++++++++++++++++++++ >> +---------------- >> 1 file changed, 75 insertions(+), 45 deletions(-) >> >> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >> index e33fdf50..7f91f38c 100644 >> --- a/src/backup/verify.rs >> +++ b/src/backup/verify.rs >> @@ -1,6 +1,6 @@ >> use pbs_config::BackupLockGuard; >> use std::collections::HashSet; >> -use std::sync::atomic::{AtomicUsize, Ordering}; >> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; >> use std::sync::{Arc, Mutex}; >> use std::time::Instant; >> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, >> IndexFile}; >> use pbs_datastore::manifest::{BackupManifest, FileInfo}; >> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, >> StoreProgress}; >> -use crate::tools::parallel_handler::ParallelHandler; >> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; >> use crate::backup::hierarchy::ListAccessibleBackupGroups; >> @@ -156,23 +156,20 @@ impl VerifyWorker { >> let start_time = Instant::now(); >> - let mut read_bytes = 0; >> - let mut decoded_bytes = 0; >> + let read_bytes = Arc::new(AtomicU64::new(0)); >> + let decoded_bytes = Arc::new(AtomicU64::new(0)); >> - let datastore2 = Arc::clone(&self.datastore); >> - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); >> - let verified_chunks2 = Arc::clone(&self.verified_chunks); >> - let errors2 = Arc::clone(&errors); >> - >> - let decoder_pool = ParallelHandler::new( >> - "verify chunk decoder", >> - 4, >> + let decoder_pool = ParallelHandler::new("verify chunk >> decoder", 4, { >> + let datastore = Arc::clone(&self.datastore); >> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >> + let verified_chunks = Arc::clone(&self.verified_chunks); >> + let errors = Arc::clone(&errors); > > Hmm, this really warrants an internal state struct for bundling theses > and only cloning the arc around that. > > E.g. I would propose to introduce a struct along the lines of: > > struct IndexVerifyState { > read_bytes: AtomicU64, > decoded_bytes: AtomicU64, > errors: AtomicUsize, > datastore: Arc<DataStore>, > corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > start_time: Instant, > } > > Which can then be wrapped in an Arc and also passed along as parameter > to the inner methods where required. > > See the diff below (untested), which could be used for further > refining this. > I thought this could be improved in the initial patch, but wasn't quite sure how I should approach it, so thanks for the Feedback! The diff below looks good to me, I'll send a v2 for this in addition to the things you pointed out in the other patches. >> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { >> let chunk_crypt_mode = match chunk.crypt_mode() { >> Err(err) => { >> - >> corrupt_chunks2.lock().unwrap().insert(digest); >> + corrupt_chunks.lock().unwrap().insert(digest); >> info!("can't verify chunk, unknown >> CryptMode - {err}"); >> - errors2.fetch_add(1, Ordering::SeqCst); >> + errors.fetch_add(1, Ordering::SeqCst); >> return Ok(()); >> } >> Ok(mode) => mode, >> @@ -182,21 +179,21 @@ impl VerifyWorker { >> info!( >> "chunk CryptMode {chunk_crypt_mode:?} does not >> match index CryptMode {crypt_mode:?}" >> ); >> - errors2.fetch_add(1, Ordering::SeqCst); >> + errors.fetch_add(1, Ordering::SeqCst); >> } >> if let Err(err) = chunk.verify_unencrypted(size >> as usize, &digest) { >> - corrupt_chunks2.lock().unwrap().insert(digest); >> + corrupt_chunks.lock().unwrap().insert(digest); >> info!("{err}"); >> - errors2.fetch_add(1, Ordering::SeqCst); >> - >> Self::rename_corrupted_chunk(datastore2.clone(), &digest); >> + errors.fetch_add(1, Ordering::SeqCst); >> + Self::rename_corrupted_chunk(datastore.clone(), >> &digest); >> } else { >> - verified_chunks2.lock().unwrap().insert(digest); >> + verified_chunks.lock().unwrap().insert(digest); >> } >> Ok(()) >> - }, >> - ); >> + } >> + }); >> let skip_chunk = |digest: &[u8; 32]| -> bool { >> if >> self.verified_chunks.lock().unwrap().contains(digest) { >> @@ -223,6 +220,29 @@ impl VerifyWorker { >> .datastore >> .get_chunks_in_order(&*index, skip_chunk, check_abort)?; >> + let reader_pool = ParallelHandler::new("read chunks", 4, { >> + let decoder_pool = decoder_pool.channel(); >> + let datastore = Arc::clone(&self.datastore); >> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >> + let read_bytes = Arc::clone(&read_bytes); >> + let decoded_bytes = Arc::clone(&decoded_bytes); >> + let errors = Arc::clone(&errors); >> + let backend = self.backend.clone(); >> + >> + move |info: ChunkReadInfo| { >> + Self::verify_chunk_by_backend( >> + &backend, >> + Arc::clone(&datastore), >> + Arc::clone(&corrupt_chunks), >> + Arc::clone(&read_bytes), >> + Arc::clone(&decoded_bytes), >> + Arc::clone(&errors), >> + &decoder_pool, >> + &info, >> + ) >> + } >> + }); >> + >> for (pos, _) in chunk_list { >> self.worker.check_abort()?; >> self.worker.fail_on_shutdown()?; >> @@ -234,19 +254,16 @@ impl VerifyWorker { >> continue; // already verified or marked corrupt >> } >> - self.verify_chunk_by_backend( >> - &info, >> - &mut read_bytes, >> - &mut decoded_bytes, >> - Arc::clone(&errors), >> - &decoder_pool, >> - )?; >> + reader_pool.send(info)?; >> } >> - decoder_pool.complete()?; >> + reader_pool.complete()?; >> let elapsed = start_time.elapsed().as_secs_f64(); >> + let read_bytes = read_bytes.load(Ordering::SeqCst); >> + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); >> + >> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); >> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * >> 1024.0); >> @@ -266,26 +283,31 @@ impl VerifyWorker { >> Ok(()) >> } >> + #[allow(clippy::too_many_arguments)] >> fn verify_chunk_by_backend( >> - &self, >> - info: &ChunkReadInfo, >> - read_bytes: &mut u64, >> - decoded_bytes: &mut u64, >> + backend: &DatastoreBackend, >> + datastore: Arc<DataStore>, >> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> + read_bytes: Arc<AtomicU64>, >> + decoded_bytes: Arc<AtomicU64>, >> errors: Arc<AtomicUsize>, >> - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, >> + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, >> + info: &ChunkReadInfo, >> ) -> Result<(), Error> { >> - match &self.backend { >> - DatastoreBackend::Filesystem => match >> self.datastore.load_chunk(&info.digest) { >> - Err(err) => self.add_corrupt_chunk( >> + match backend { >> + DatastoreBackend::Filesystem => match >> datastore.load_chunk(&info.digest) { >> + Err(err) => Self::add_corrupt_chunk( >> + datastore, >> + corrupt_chunks, >> info.digest, >> errors, >> &format!("can't verify chunk, load failed - >> {err}"), >> ), >> Ok(chunk) => { >> let size = info.size(); >> - *read_bytes += chunk.raw_size(); >> + read_bytes.fetch_add(chunk.raw_size(), >> Ordering::SeqCst); >> decoder_pool.send((chunk, info.digest, size))?; >> - *decoded_bytes += size; >> + decoded_bytes.fetch_add(size, Ordering::SeqCst); >> } >> }, >> DatastoreBackend::S3(s3_client) => { >> @@ -302,9 +324,9 @@ impl VerifyWorker { >> match chunk_result { >> Ok(chunk) => { >> let size = info.size(); >> - *read_bytes += chunk.raw_size(); >> + >> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); >> decoder_pool.send((chunk, >> info.digest, size))?; >> - *decoded_bytes += size; >> + decoded_bytes.fetch_add(size, >> Ordering::SeqCst); >> } >> Err(err) => { >> errors.fetch_add(1, >> Ordering::SeqCst); >> @@ -312,7 +334,9 @@ impl VerifyWorker { >> } >> } >> } >> - Ok(None) => self.add_corrupt_chunk( >> + Ok(None) => Self::add_corrupt_chunk( >> + datastore, >> + corrupt_chunks, >> info.digest, >> errors, >> &format!( >> @@ -330,13 +354,19 @@ impl VerifyWorker { >> Ok(()) >> } >> - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: >> Arc<AtomicUsize>, message: &str) { >> + fn add_corrupt_chunk( >> + datastore: Arc<DataStore>, >> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> + digest: [u8; 32], >> + errors: Arc<AtomicUsize>, >> + message: &str, >> + ) { >> // Panic on poisoned mutex >> - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); >> + let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); >> corrupt_chunks.insert(digest); >> error!(message); >> errors.fetch_add(1, Ordering::SeqCst); >> - Self::rename_corrupted_chunk(self.datastore.clone(), &digest); >> + Self::rename_corrupted_chunk(datastore.clone(), &digest); >> } >> fn verify_fixed_index(&self, backup_dir: &BackupDir, info: >> &FileInfo) -> Result<(), Error> { > > > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index 7f91f38c9..e01405750 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -152,24 +152,24 @@ impl VerifyWorker { > index: Box<dyn IndexFile + Send>, > crypt_mode: CryptMode, > ) -> Result<(), Error> { > - let errors = Arc::new(AtomicUsize::new(0)); > - > - let start_time = Instant::now(); > - > - let read_bytes = Arc::new(AtomicU64::new(0)); > - let decoded_bytes = Arc::new(AtomicU64::new(0)); > + let index_verify_state = Arc::new(IndexVerifyState { > + read_bytes: AtomicU64::new(0), > + decoded_bytes: AtomicU64::new(0), > + errors: AtomicUsize::new(0), > + datastore: Arc::clone(&self.datastore), > + corrupt_chunks: Arc::clone(&self.corrupt_chunks), > + start_time: Instant::now(), > + }); > > let decoder_pool = ParallelHandler::new("verify chunk > decoder", 4, { > - let datastore = Arc::clone(&self.datastore); > - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > + let state = Arc::clone(&index_verify_state); > let verified_chunks = Arc::clone(&self.verified_chunks); > - let errors = Arc::clone(&errors); > move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { > let chunk_crypt_mode = match chunk.crypt_mode() { > Err(err) => { > - corrupt_chunks.lock().unwrap().insert(digest); > + state.corrupt_chunks.lock().unwrap().insert(digest); > info!("can't verify chunk, unknown CryptMode > - {err}"); > - errors.fetch_add(1, Ordering::SeqCst); > + state.errors.fetch_add(1, Ordering::SeqCst); > return Ok(()); > } > Ok(mode) => mode, > @@ -179,14 +179,14 @@ impl VerifyWorker { > info!( > "chunk CryptMode {chunk_crypt_mode:?} does not > match index CryptMode {crypt_mode:?}" > ); > - errors.fetch_add(1, Ordering::SeqCst); > + state.errors.fetch_add(1, Ordering::SeqCst); > } > > if let Err(err) = chunk.verify_unencrypted(size as > usize, &digest) { > - corrupt_chunks.lock().unwrap().insert(digest); > + state.corrupt_chunks.lock().unwrap().insert(digest); > info!("{err}"); > - errors.fetch_add(1, Ordering::SeqCst); > - Self::rename_corrupted_chunk(datastore.clone(), > &digest); > + state.errors.fetch_add(1, Ordering::SeqCst); > + Self::rename_corrupted_chunk(state.datastore.clone(), &digest); > } else { > verified_chunks.lock().unwrap().insert(digest); > } > @@ -201,7 +201,7 @@ impl VerifyWorker { > } else if > self.corrupt_chunks.lock().unwrap().contains(digest) { > let digest_str = hex::encode(digest); > info!("chunk {digest_str} was marked as corrupt"); > - errors.fetch_add(1, Ordering::SeqCst); > + index_verify_state.errors.fetch_add(1, > Ordering::SeqCst); > true > } else { > false > @@ -222,24 +222,11 @@ impl VerifyWorker { > > let reader_pool = ParallelHandler::new("read chunks", 4, { > let decoder_pool = decoder_pool.channel(); > - let datastore = Arc::clone(&self.datastore); > - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > - let read_bytes = Arc::clone(&read_bytes); > - let decoded_bytes = Arc::clone(&decoded_bytes); > - let errors = Arc::clone(&errors); > + let state = Arc::clone(&index_verify_state); > let backend = self.backend.clone(); > > move |info: ChunkReadInfo| { > - Self::verify_chunk_by_backend( > - &backend, > - Arc::clone(&datastore), > - Arc::clone(&corrupt_chunks), > - Arc::clone(&read_bytes), > - Arc::clone(&decoded_bytes), > - Arc::clone(&errors), > - &decoder_pool, > - &info, > - ) > + Self::verify_chunk_by_backend(&backend, > Arc::clone(&state), &decoder_pool, &info) > } > }); > > @@ -259,10 +246,10 @@ impl VerifyWorker { > > reader_pool.complete()?; > > - let elapsed = start_time.elapsed().as_secs_f64(); > + let elapsed = > index_verify_state.start_time.elapsed().as_secs_f64(); > > - let read_bytes = read_bytes.load(Ordering::SeqCst); > - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); > + let read_bytes = > index_verify_state.read_bytes.load(Ordering::SeqCst); > + let decoded_bytes = > index_verify_state.decoded_bytes.load(Ordering::SeqCst); > > let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); > let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * > 1024.0); > @@ -270,13 +257,13 @@ impl VerifyWorker { > let read_speed = read_bytes_mib / elapsed; > let decode_speed = decoded_bytes_mib / elapsed; > > - let error_count = errors.load(Ordering::SeqCst); > + let error_count = > index_verify_state.errors.load(Ordering::SeqCst); > > info!( > " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} > MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} > MiB/s ({error_count} errors)" > ); > > - if errors.load(Ordering::SeqCst) > 0 { > + if index_verify_state.errors.load(Ordering::SeqCst) > 0 { > bail!("chunks could not be verified"); > } > > @@ -286,28 +273,26 @@ impl VerifyWorker { > #[allow(clippy::too_many_arguments)] > fn verify_chunk_by_backend( > backend: &DatastoreBackend, > - datastore: Arc<DataStore>, > - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > - read_bytes: Arc<AtomicU64>, > - decoded_bytes: Arc<AtomicU64>, > - errors: Arc<AtomicUsize>, > + state: Arc<IndexVerifyState>, > decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, > info: &ChunkReadInfo, > ) -> Result<(), Error> { > match backend { > - DatastoreBackend::Filesystem => match > datastore.load_chunk(&info.digest) { > + DatastoreBackend::Filesystem => match > state.datastore.load_chunk(&info.digest) { > Err(err) => Self::add_corrupt_chunk( > - datastore, > - corrupt_chunks, > + Arc::clone(&state.datastore), > + Arc::clone(&state.corrupt_chunks), > info.digest, > - errors, > + &state.errors, > &format!("can't verify chunk, load failed - {err}"), > ), > Ok(chunk) => { > let size = info.size(); > - read_bytes.fetch_add(chunk.raw_size(), > Ordering::SeqCst); > + state > + .read_bytes > + .fetch_add(chunk.raw_size(), Ordering::SeqCst); > decoder_pool.send((chunk, info.digest, size))?; > - decoded_bytes.fetch_add(size, Ordering::SeqCst); > + state.decoded_bytes.fetch_add(size, > Ordering::SeqCst); > } > }, > DatastoreBackend::S3(s3_client) => { > @@ -324,28 +309,30 @@ impl VerifyWorker { > match chunk_result { > Ok(chunk) => { > let size = info.size(); > - > read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); > + state > + .read_bytes > + .fetch_add(chunk.raw_size(), > Ordering::SeqCst); > decoder_pool.send((chunk, > info.digest, size))?; > - decoded_bytes.fetch_add(size, > Ordering::SeqCst); > + state.decoded_bytes.fetch_add(size, > Ordering::SeqCst); > } > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + state.errors.fetch_add(1, > Ordering::SeqCst); > error!("can't verify chunk, load > failed - {err}"); > } > } > } > Ok(None) => Self::add_corrupt_chunk( > - datastore, > - corrupt_chunks, > + Arc::clone(&state.datastore), > + Arc::clone(&state.corrupt_chunks), > info.digest, > - errors, > + &state.errors, > &format!( > "can't verify missing chunk with digest {}", > hex::encode(info.digest) > ), > ), > Err(err) => { > - errors.fetch_add(1, Ordering::SeqCst); > + state.errors.fetch_add(1, Ordering::SeqCst); > error!("can't verify chunk, load failed - > {err}"); > } > } > @@ -358,7 +345,7 @@ impl VerifyWorker { > datastore: Arc<DataStore>, > corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > digest: [u8; 32], > - errors: Arc<AtomicUsize>, > + errors: &AtomicUsize, > message: &str, > ) { > // Panic on poisoned mutex > @@ -681,3 +668,12 @@ impl VerifyWorker { > } > } > } > + > +struct IndexVerifyState { > + read_bytes: AtomicU64, > + decoded_bytes: AtomicU64, > + errors: AtomicUsize, > + datastore: Arc<DataStore>, > + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > + start_time: Instant, > +} > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler 2025-11-06 9:04 ` Nicolas Frey @ 2025-11-06 9:26 ` Christian Ebner 0 siblings, 0 replies; 19+ messages in thread From: Christian Ebner @ 2025-11-06 9:26 UTC (permalink / raw) To: Nicolas Frey, Proxmox Backup Server development discussion On 11/6/25 10:04 AM, Nicolas Frey wrote: > > On 11/6/25 9:54 AM, Christian Ebner wrote: >> On 11/5/25 4:51 PM, Nicolas Frey wrote: >>> This way, the chunks will be loaded in parallel in addition to being >>> checked in parallel. >>> >>> Depending on the underlying storage, this can speed up reading chunks >>> from disk, especially when the underlying storage is IO depth >>> dependent, and the CPU is faster than the storage. >>> >>> Originally-by: Dominik Csapak <d.csapak@proxmox.com> >>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >>> --- >>> src/backup/verify.rs | 120 ++++++++++++++++++++++++++ >>> +---------------- >>> 1 file changed, 75 insertions(+), 45 deletions(-) >>> >>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >>> index e33fdf50..7f91f38c 100644 >>> --- a/src/backup/verify.rs >>> +++ b/src/backup/verify.rs >>> @@ -1,6 +1,6 @@ >>> use pbs_config::BackupLockGuard; >>> use std::collections::HashSet; >>> -use std::sync::atomic::{AtomicUsize, Ordering}; >>> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; >>> use std::sync::{Arc, Mutex}; >>> use std::time::Instant; >>> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, >>> IndexFile}; >>> use pbs_datastore::manifest::{BackupManifest, FileInfo}; >>> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, >>> StoreProgress}; >>> -use crate::tools::parallel_handler::ParallelHandler; >>> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; >>> use crate::backup::hierarchy::ListAccessibleBackupGroups; >>> @@ -156,23 +156,20 @@ impl VerifyWorker { >>> let start_time = Instant::now(); >>> - let mut read_bytes = 0; >>> - let mut decoded_bytes = 0; >>> + let read_bytes = Arc::new(AtomicU64::new(0)); >>> + let decoded_bytes = Arc::new(AtomicU64::new(0)); >>> - let datastore2 = Arc::clone(&self.datastore); >>> - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks); >>> - let verified_chunks2 = Arc::clone(&self.verified_chunks); >>> - let errors2 = Arc::clone(&errors); >>> - >>> - let decoder_pool = ParallelHandler::new( >>> - "verify chunk decoder", >>> - 4, >>> + let decoder_pool = ParallelHandler::new("verify chunk >>> decoder", 4, { >>> + let datastore = Arc::clone(&self.datastore); >>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >>> + let verified_chunks = Arc::clone(&self.verified_chunks); >>> + let errors = Arc::clone(&errors); >> >> Hmm, this really warrants an internal state struct for bundling theses >> and only cloning the arc around that. >> >> E.g. I would propose to introduce a struct along the lines of: >> >> struct IndexVerifyState { >> read_bytes: AtomicU64, >> decoded_bytes: AtomicU64, >> errors: AtomicUsize, >> datastore: Arc<DataStore>, >> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> start_time: Instant, >> } >> >> Which can then be wrapped in an Arc and also passed along as parameter >> to the inner methods where required. >> >> See the diff below (untested), which could be used for further >> refining this. >> > > I thought this could be improved in the initial patch, but wasn't > quite sure how I should approach it, so thanks for the Feedback! > The diff below looks good to me, I'll send a v2 for this in addition > to the things you pointed out in the other patches. Great! But please note that the diff was just a rough suggestion in which way to improve from my side, this can be further refined a bit. E.g. we might passt the state also to the `add_corrupt_chunk()` which would also benefit from a reduction in function parameters by this.> >>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { >>> let chunk_crypt_mode = match chunk.crypt_mode() { >>> Err(err) => { >>> - >>> corrupt_chunks2.lock().unwrap().insert(digest); >>> + corrupt_chunks.lock().unwrap().insert(digest); >>> info!("can't verify chunk, unknown >>> CryptMode - {err}"); >>> - errors2.fetch_add(1, Ordering::SeqCst); >>> + errors.fetch_add(1, Ordering::SeqCst); >>> return Ok(()); >>> } >>> Ok(mode) => mode, >>> @@ -182,21 +179,21 @@ impl VerifyWorker { >>> info!( >>> "chunk CryptMode {chunk_crypt_mode:?} does not >>> match index CryptMode {crypt_mode:?}" >>> ); >>> - errors2.fetch_add(1, Ordering::SeqCst); >>> + errors.fetch_add(1, Ordering::SeqCst); >>> } >>> if let Err(err) = chunk.verify_unencrypted(size >>> as usize, &digest) { >>> - corrupt_chunks2.lock().unwrap().insert(digest); >>> + corrupt_chunks.lock().unwrap().insert(digest); >>> info!("{err}"); >>> - errors2.fetch_add(1, Ordering::SeqCst); >>> - >>> Self::rename_corrupted_chunk(datastore2.clone(), &digest); >>> + errors.fetch_add(1, Ordering::SeqCst); >>> + Self::rename_corrupted_chunk(datastore.clone(), >>> &digest); >>> } else { >>> - verified_chunks2.lock().unwrap().insert(digest); >>> + verified_chunks.lock().unwrap().insert(digest); >>> } >>> Ok(()) >>> - }, >>> - ); >>> + } >>> + }); >>> let skip_chunk = |digest: &[u8; 32]| -> bool { >>> if >>> self.verified_chunks.lock().unwrap().contains(digest) { >>> @@ -223,6 +220,29 @@ impl VerifyWorker { >>> .datastore >>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?; >>> + let reader_pool = ParallelHandler::new("read chunks", 4, { >>> + let decoder_pool = decoder_pool.channel(); >>> + let datastore = Arc::clone(&self.datastore); >>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >>> + let read_bytes = Arc::clone(&read_bytes); >>> + let decoded_bytes = Arc::clone(&decoded_bytes); >>> + let errors = Arc::clone(&errors); >>> + let backend = self.backend.clone(); >>> + >>> + move |info: ChunkReadInfo| { >>> + Self::verify_chunk_by_backend( >>> + &backend, >>> + Arc::clone(&datastore), >>> + Arc::clone(&corrupt_chunks), >>> + Arc::clone(&read_bytes), >>> + Arc::clone(&decoded_bytes), >>> + Arc::clone(&errors), >>> + &decoder_pool, >>> + &info, >>> + ) >>> + } >>> + }); >>> + >>> for (pos, _) in chunk_list { >>> self.worker.check_abort()?; >>> self.worker.fail_on_shutdown()?; >>> @@ -234,19 +254,16 @@ impl VerifyWorker { >>> continue; // already verified or marked corrupt >>> } >>> - self.verify_chunk_by_backend( >>> - &info, >>> - &mut read_bytes, >>> - &mut decoded_bytes, >>> - Arc::clone(&errors), >>> - &decoder_pool, >>> - )?; >>> + reader_pool.send(info)?; >>> } >>> - decoder_pool.complete()?; >>> + reader_pool.complete()?; >>> let elapsed = start_time.elapsed().as_secs_f64(); >>> + let read_bytes = read_bytes.load(Ordering::SeqCst); >>> + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); >>> + >>> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); >>> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * >>> 1024.0); >>> @@ -266,26 +283,31 @@ impl VerifyWorker { >>> Ok(()) >>> } >>> + #[allow(clippy::too_many_arguments)] >>> fn verify_chunk_by_backend( >>> - &self, >>> - info: &ChunkReadInfo, >>> - read_bytes: &mut u64, >>> - decoded_bytes: &mut u64, >>> + backend: &DatastoreBackend, >>> + datastore: Arc<DataStore>, >>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>> + read_bytes: Arc<AtomicU64>, >>> + decoded_bytes: Arc<AtomicU64>, >>> errors: Arc<AtomicUsize>, >>> - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>, >>> + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, >>> + info: &ChunkReadInfo, >>> ) -> Result<(), Error> { >>> - match &self.backend { >>> - DatastoreBackend::Filesystem => match >>> self.datastore.load_chunk(&info.digest) { >>> - Err(err) => self.add_corrupt_chunk( >>> + match backend { >>> + DatastoreBackend::Filesystem => match >>> datastore.load_chunk(&info.digest) { >>> + Err(err) => Self::add_corrupt_chunk( >>> + datastore, >>> + corrupt_chunks, >>> info.digest, >>> errors, >>> &format!("can't verify chunk, load failed - >>> {err}"), >>> ), >>> Ok(chunk) => { >>> let size = info.size(); >>> - *read_bytes += chunk.raw_size(); >>> + read_bytes.fetch_add(chunk.raw_size(), >>> Ordering::SeqCst); >>> decoder_pool.send((chunk, info.digest, size))?; >>> - *decoded_bytes += size; >>> + decoded_bytes.fetch_add(size, Ordering::SeqCst); >>> } >>> }, >>> DatastoreBackend::S3(s3_client) => { >>> @@ -302,9 +324,9 @@ impl VerifyWorker { >>> match chunk_result { >>> Ok(chunk) => { >>> let size = info.size(); >>> - *read_bytes += chunk.raw_size(); >>> + >>> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); >>> decoder_pool.send((chunk, >>> info.digest, size))?; >>> - *decoded_bytes += size; >>> + decoded_bytes.fetch_add(size, >>> Ordering::SeqCst); >>> } >>> Err(err) => { >>> errors.fetch_add(1, >>> Ordering::SeqCst); >>> @@ -312,7 +334,9 @@ impl VerifyWorker { >>> } >>> } >>> } >>> - Ok(None) => self.add_corrupt_chunk( >>> + Ok(None) => Self::add_corrupt_chunk( >>> + datastore, >>> + corrupt_chunks, >>> info.digest, >>> errors, >>> &format!( >>> @@ -330,13 +354,19 @@ impl VerifyWorker { >>> Ok(()) >>> } >>> - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: >>> Arc<AtomicUsize>, message: &str) { >>> + fn add_corrupt_chunk( >>> + datastore: Arc<DataStore>, >>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>> + digest: [u8; 32], >>> + errors: Arc<AtomicUsize>, >>> + message: &str, >>> + ) { >>> // Panic on poisoned mutex >>> - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap(); >>> + let mut corrupt_chunks = corrupt_chunks.lock().unwrap(); >>> corrupt_chunks.insert(digest); >>> error!(message); >>> errors.fetch_add(1, Ordering::SeqCst); >>> - Self::rename_corrupted_chunk(self.datastore.clone(), &digest); >>> + Self::rename_corrupted_chunk(datastore.clone(), &digest); >>> } >>> fn verify_fixed_index(&self, backup_dir: &BackupDir, info: >>> &FileInfo) -> Result<(), Error> { >> >> >> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >> index 7f91f38c9..e01405750 100644 >> --- a/src/backup/verify.rs >> +++ b/src/backup/verify.rs >> @@ -152,24 +152,24 @@ impl VerifyWorker { >> index: Box<dyn IndexFile + Send>, >> crypt_mode: CryptMode, >> ) -> Result<(), Error> { >> - let errors = Arc::new(AtomicUsize::new(0)); >> - >> - let start_time = Instant::now(); >> - >> - let read_bytes = Arc::new(AtomicU64::new(0)); >> - let decoded_bytes = Arc::new(AtomicU64::new(0)); >> + let index_verify_state = Arc::new(IndexVerifyState { >> + read_bytes: AtomicU64::new(0), >> + decoded_bytes: AtomicU64::new(0), >> + errors: AtomicUsize::new(0), >> + datastore: Arc::clone(&self.datastore), >> + corrupt_chunks: Arc::clone(&self.corrupt_chunks), >> + start_time: Instant::now(), >> + }); >> >> let decoder_pool = ParallelHandler::new("verify chunk >> decoder", 4, { >> - let datastore = Arc::clone(&self.datastore); >> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >> + let state = Arc::clone(&index_verify_state); >> let verified_chunks = Arc::clone(&self.verified_chunks); >> - let errors = Arc::clone(&errors); >> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { >> let chunk_crypt_mode = match chunk.crypt_mode() { >> Err(err) => { >> - corrupt_chunks.lock().unwrap().insert(digest); >> + state.corrupt_chunks.lock().unwrap().insert(digest); >> info!("can't verify chunk, unknown CryptMode >> - {err}"); >> - errors.fetch_add(1, Ordering::SeqCst); >> + state.errors.fetch_add(1, Ordering::SeqCst); >> return Ok(()); >> } >> Ok(mode) => mode, >> @@ -179,14 +179,14 @@ impl VerifyWorker { >> info!( >> "chunk CryptMode {chunk_crypt_mode:?} does not >> match index CryptMode {crypt_mode:?}" >> ); >> - errors.fetch_add(1, Ordering::SeqCst); >> + state.errors.fetch_add(1, Ordering::SeqCst); >> } >> >> if let Err(err) = chunk.verify_unencrypted(size as >> usize, &digest) { >> - corrupt_chunks.lock().unwrap().insert(digest); >> + state.corrupt_chunks.lock().unwrap().insert(digest); >> info!("{err}"); >> - errors.fetch_add(1, Ordering::SeqCst); >> - Self::rename_corrupted_chunk(datastore.clone(), >> &digest); >> + state.errors.fetch_add(1, Ordering::SeqCst); >> + Self::rename_corrupted_chunk(state.datastore.clone(), &digest); >> } else { >> verified_chunks.lock().unwrap().insert(digest); >> } >> @@ -201,7 +201,7 @@ impl VerifyWorker { >> } else if >> self.corrupt_chunks.lock().unwrap().contains(digest) { >> let digest_str = hex::encode(digest); >> info!("chunk {digest_str} was marked as corrupt"); >> - errors.fetch_add(1, Ordering::SeqCst); >> + index_verify_state.errors.fetch_add(1, >> Ordering::SeqCst); >> true >> } else { >> false >> @@ -222,24 +222,11 @@ impl VerifyWorker { >> >> let reader_pool = ParallelHandler::new("read chunks", 4, { >> let decoder_pool = decoder_pool.channel(); >> - let datastore = Arc::clone(&self.datastore); >> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >> - let read_bytes = Arc::clone(&read_bytes); >> - let decoded_bytes = Arc::clone(&decoded_bytes); >> - let errors = Arc::clone(&errors); >> + let state = Arc::clone(&index_verify_state); >> let backend = self.backend.clone(); >> >> move |info: ChunkReadInfo| { >> - Self::verify_chunk_by_backend( >> - &backend, >> - Arc::clone(&datastore), >> - Arc::clone(&corrupt_chunks), >> - Arc::clone(&read_bytes), >> - Arc::clone(&decoded_bytes), >> - Arc::clone(&errors), >> - &decoder_pool, >> - &info, >> - ) >> + Self::verify_chunk_by_backend(&backend, >> Arc::clone(&state), &decoder_pool, &info) >> } >> }); >> >> @@ -259,10 +246,10 @@ impl VerifyWorker { >> >> reader_pool.complete()?; >> >> - let elapsed = start_time.elapsed().as_secs_f64(); >> + let elapsed = >> index_verify_state.start_time.elapsed().as_secs_f64(); >> >> - let read_bytes = read_bytes.load(Ordering::SeqCst); >> - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst); >> + let read_bytes = >> index_verify_state.read_bytes.load(Ordering::SeqCst); >> + let decoded_bytes = >> index_verify_state.decoded_bytes.load(Ordering::SeqCst); >> >> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0); >> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * >> 1024.0); >> @@ -270,13 +257,13 @@ impl VerifyWorker { >> let read_speed = read_bytes_mib / elapsed; >> let decode_speed = decoded_bytes_mib / elapsed; >> >> - let error_count = errors.load(Ordering::SeqCst); >> + let error_count = >> index_verify_state.errors.load(Ordering::SeqCst); >> >> info!( >> " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} >> MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} >> MiB/s ({error_count} errors)" >> ); >> >> - if errors.load(Ordering::SeqCst) > 0 { >> + if index_verify_state.errors.load(Ordering::SeqCst) > 0 { >> bail!("chunks could not be verified"); >> } >> >> @@ -286,28 +273,26 @@ impl VerifyWorker { >> #[allow(clippy::too_many_arguments)] >> fn verify_chunk_by_backend( >> backend: &DatastoreBackend, >> - datastore: Arc<DataStore>, >> - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> - read_bytes: Arc<AtomicU64>, >> - decoded_bytes: Arc<AtomicU64>, >> - errors: Arc<AtomicUsize>, >> + state: Arc<IndexVerifyState>, >> decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>, >> info: &ChunkReadInfo, >> ) -> Result<(), Error> { >> match backend { >> - DatastoreBackend::Filesystem => match >> datastore.load_chunk(&info.digest) { >> + DatastoreBackend::Filesystem => match >> state.datastore.load_chunk(&info.digest) { >> Err(err) => Self::add_corrupt_chunk( >> - datastore, >> - corrupt_chunks, >> + Arc::clone(&state.datastore), >> + Arc::clone(&state.corrupt_chunks), >> info.digest, >> - errors, >> + &state.errors, >> &format!("can't verify chunk, load failed - {err}"), >> ), >> Ok(chunk) => { >> let size = info.size(); >> - read_bytes.fetch_add(chunk.raw_size(), >> Ordering::SeqCst); >> + state >> + .read_bytes >> + .fetch_add(chunk.raw_size(), Ordering::SeqCst); >> decoder_pool.send((chunk, info.digest, size))?; >> - decoded_bytes.fetch_add(size, Ordering::SeqCst); >> + state.decoded_bytes.fetch_add(size, >> Ordering::SeqCst); >> } >> }, >> DatastoreBackend::S3(s3_client) => { >> @@ -324,28 +309,30 @@ impl VerifyWorker { >> match chunk_result { >> Ok(chunk) => { >> let size = info.size(); >> - >> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst); >> + state >> + .read_bytes >> + .fetch_add(chunk.raw_size(), >> Ordering::SeqCst); >> decoder_pool.send((chunk, >> info.digest, size))?; >> - decoded_bytes.fetch_add(size, >> Ordering::SeqCst); >> + state.decoded_bytes.fetch_add(size, >> Ordering::SeqCst); >> } >> Err(err) => { >> - errors.fetch_add(1, Ordering::SeqCst); >> + state.errors.fetch_add(1, >> Ordering::SeqCst); >> error!("can't verify chunk, load >> failed - {err}"); >> } >> } >> } >> Ok(None) => Self::add_corrupt_chunk( >> - datastore, >> - corrupt_chunks, >> + Arc::clone(&state.datastore), >> + Arc::clone(&state.corrupt_chunks), >> info.digest, >> - errors, >> + &state.errors, >> &format!( >> "can't verify missing chunk with digest {}", >> hex::encode(info.digest) >> ), >> ), >> Err(err) => { >> - errors.fetch_add(1, Ordering::SeqCst); >> + state.errors.fetch_add(1, Ordering::SeqCst); >> error!("can't verify chunk, load failed - >> {err}"); >> } >> } >> @@ -358,7 +345,7 @@ impl VerifyWorker { >> datastore: Arc<DataStore>, >> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> digest: [u8; 32], >> - errors: Arc<AtomicUsize>, >> + errors: &AtomicUsize, >> message: &str, >> ) { >> // Panic on poisoned mutex >> @@ -681,3 +668,12 @@ impl VerifyWorker { >> } >> } >> } >> + >> +struct IndexVerifyState { >> + read_bytes: AtomicU64, >> + decoded_bytes: AtomicU64, >> + errors: AtomicUsize, >> + datastore: Arc<DataStore>, >> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> + start_time: Instant, >> +} >> > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey @ 2025-11-05 15:51 ` Nicolas Frey 2025-11-06 9:09 ` Christian Ebner 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey ` (2 subsequent siblings) 5 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw) To: pbs-devel Signed-off-by: Nicolas Frey <n.frey@proxmox.com> --- src/api2/admin/datastore.rs | 13 +++++++++++-- src/api2/backup/environment.rs | 2 +- src/backup/verify.rs | 5 ++++- src/server/verify_job.rs | 3 ++- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index d192ee39..69a09081 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -677,6 +677,14 @@ pub async fn status( schema: NS_MAX_DEPTH_SCHEMA, optional: true, }, + "worker-threads": { + description: "Set the number of worker threads to use for the job", + type: Integer, + optional: true, + minimum: 1, + maximum: 32, + default: 1, + }, }, }, returns: { @@ -690,7 +698,7 @@ pub async fn status( )] /// Verify backups. /// -/// This function can verify a single backup snapshot, all backup from a backup group, +/// This function can verify a single backup snapshot, all backups from a backup group, /// or all backups in the datastore. #[allow(clippy::too_many_arguments)] pub fn verify( @@ -702,6 +710,7 @@ pub fn verify( ignore_verified: Option<bool>, outdated_after: Option<i64>, max_depth: Option<usize>, + worker_threads: Option<usize>, rpcenv: &mut dyn RpcEnvironment, ) -> Result<Value, Error> { let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; @@ -781,7 +790,7 @@ pub fn verify( auth_id.to_string(), to_stdout, move |worker| { - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; + let verify_worker = VerifyWorker::new(worker.clone(), datastore, worker_threads)?; let failed_dirs = if let Some(backup_dir) = backup_dir { let mut res = Vec::new(); if !verify_worker.verify_backup_dir( diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs index 0e8eab1b..5e6a73b9 100644 --- a/src/api2/backup/environment.rs +++ b/src/api2/backup/environment.rs @@ -812,7 +812,7 @@ impl BackupEnvironment { move |worker| { worker.log_message("Automatically verifying newly added snapshot"); - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; + let verify_worker = VerifyWorker::new(worker.clone(), datastore, None)?; if !verify_worker.verify_backup_dir_with_lock( &backup_dir, worker.upid().clone(), diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 7f91f38c..e11dba8e 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -32,6 +32,7 @@ pub struct VerifyWorker { verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, backend: DatastoreBackend, + worker_threads: Option<usize>, } impl VerifyWorker { @@ -39,6 +40,7 @@ impl VerifyWorker { pub fn new( worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>, + worker_threads: Option<usize>, ) -> Result<Self, Error> { let backend = datastore.backend()?; Ok(Self { @@ -49,6 +51,7 @@ impl VerifyWorker { // start with 64 chunks since we assume there are few corrupt ones corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))), backend, + worker_threads, }) } @@ -220,7 +223,7 @@ impl VerifyWorker { .datastore .get_chunks_in_order(&*index, skip_chunk, check_abort)?; - let reader_pool = ParallelHandler::new("read chunks", 4, { + let reader_pool = ParallelHandler::new("read chunks", self.worker_threads.unwrap_or(4), { let decoder_pool = decoder_pool.channel(); let datastore = Arc::clone(&self.datastore); let corrupt_chunks = Arc::clone(&self.corrupt_chunks); diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs index c8792174..9d790b07 100644 --- a/src/server/verify_job.rs +++ b/src/server/verify_job.rs @@ -41,7 +41,8 @@ pub fn do_verification_job( None => Default::default(), }; - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; + let verify_worker = + VerifyWorker::new(worker.clone(), datastore, verification_job.worker_threads)?; let result = verify_worker.verify_all_backups( worker.upid(), ns, -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey @ 2025-11-06 9:09 ` Christian Ebner 2025-11-06 9:23 ` Nicolas Frey 0 siblings, 1 reply; 19+ messages in thread From: Christian Ebner @ 2025-11-06 9:09 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Nicolas Frey Please add a short commit message describing what the worker threads cover, e.g. that this parameter controls the number of reader and chunk verification threads. What tripped me over just now: Is this intentionally not increasing the number of chunk verification threads? Or was that overlooked? From the name of the parameter I suspected this to act on both, reading and verifying. If this is not the case, maybe the parameter should get renamed to a more telling `parallel-chunk-readers` instead? further comment inline On 11/5/25 4:51 PM, Nicolas Frey wrote: > Signed-off-by: Nicolas Frey <n.frey@proxmox.com> > --- > src/api2/admin/datastore.rs | 13 +++++++++++-- > src/api2/backup/environment.rs | 2 +- > src/backup/verify.rs | 5 ++++- > src/server/verify_job.rs | 3 ++- > 4 files changed, 18 insertions(+), 5 deletions(-) > > diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs > index d192ee39..69a09081 100644 > --- a/src/api2/admin/datastore.rs > +++ b/src/api2/admin/datastore.rs > @@ -677,6 +677,14 @@ pub async fn status( > schema: NS_MAX_DEPTH_SCHEMA, > optional: true, > }, > + "worker-threads": { > + description: "Set the number of worker threads to use for the job", > + type: Integer, > + optional: true, > + minimum: 1, > + maximum: 32, > + default: 1, > + }, As mentioned on the pbs-api-types patch, this should reuse the same schema as (will be) defined there, so this does not be to be re-defined and stays in sync. > }, > }, > returns: { > @@ -690,7 +698,7 @@ pub async fn status( > )] > /// Verify backups. > /// > -/// This function can verify a single backup snapshot, all backup from a backup group, > +/// This function can verify a single backup snapshot, all backups from a backup group, > /// or all backups in the datastore. > #[allow(clippy::too_many_arguments)] > pub fn verify( > @@ -702,6 +710,7 @@ pub fn verify( > ignore_verified: Option<bool>, > outdated_after: Option<i64>, > max_depth: Option<usize>, > + worker_threads: Option<usize>, this could be a plain `usize` already, so it does not need to be unwrapped for each parallel worker instantiation. The unwrapping and setting to default can already happen in the constructor. > rpcenv: &mut dyn RpcEnvironment, > ) -> Result<Value, Error> { > let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; > @@ -781,7 +790,7 @@ pub fn verify( > auth_id.to_string(), > to_stdout, > move |worker| { > - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; > + let verify_worker = VerifyWorker::new(worker.clone(), datastore, worker_threads)?; > let failed_dirs = if let Some(backup_dir) = backup_dir { > let mut res = Vec::new(); > if !verify_worker.verify_backup_dir( > diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs > index 0e8eab1b..5e6a73b9 100644 > --- a/src/api2/backup/environment.rs > +++ b/src/api2/backup/environment.rs > @@ -812,7 +812,7 @@ impl BackupEnvironment { > move |worker| { > worker.log_message("Automatically verifying newly added snapshot"); > > - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; > + let verify_worker = VerifyWorker::new(worker.clone(), datastore, None)?; > if !verify_worker.verify_backup_dir_with_lock( > &backup_dir, > worker.upid().clone(), > diff --git a/src/backup/verify.rs b/src/backup/verify.rs > index 7f91f38c..e11dba8e 100644 > --- a/src/backup/verify.rs > +++ b/src/backup/verify.rs > @@ -32,6 +32,7 @@ pub struct VerifyWorker { > verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, > backend: DatastoreBackend, > + worker_threads: Option<usize>, ... plain `usize` here > } > > impl VerifyWorker { > @@ -39,6 +40,7 @@ impl VerifyWorker { > pub fn new( > worker: Arc<dyn WorkerTaskContext>, > datastore: Arc<DataStore>, > + worker_threads: Option<usize>, > ) -> Result<Self, Error> { > let backend = datastore.backend()?; > Ok(Self { > @@ -49,6 +51,7 @@ impl VerifyWorker { > // start with 64 chunks since we assume there are few corrupt ones > corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))), > backend, > + worker_threads, unwrap_or(4) here... or even define a constant for the default value, although if it is placed here, it will only occur once. > }) > } > > @@ -220,7 +223,7 @@ impl VerifyWorker { > .datastore > .get_chunks_in_order(&*index, skip_chunk, check_abort)?; > > - let reader_pool = ParallelHandler::new("read chunks", 4, { > + let reader_pool = ParallelHandler::new("read chunks", self.worker_threads.unwrap_or(4), { > let decoder_pool = decoder_pool.channel(); > let datastore = Arc::clone(&self.datastore); > let corrupt_chunks = Arc::clone(&self.corrupt_chunks); > diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs > index c8792174..9d790b07 100644 > --- a/src/server/verify_job.rs > +++ b/src/server/verify_job.rs > @@ -41,7 +41,8 @@ pub fn do_verification_job( > None => Default::default(), > }; > > - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?; > + let verify_worker = > + VerifyWorker::new(worker.clone(), datastore, verification_job.worker_threads)?; > let result = verify_worker.verify_all_backups( > worker.upid(), > ns, _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-06 9:09 ` Christian Ebner @ 2025-11-06 9:23 ` Nicolas Frey 2025-11-06 9:32 ` Christian Ebner 0 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-06 9:23 UTC (permalink / raw) To: Christian Ebner, Proxmox Backup Server development discussion On 11/6/25 10:08 AM, Christian Ebner wrote: > Please add a short commit message describing what the worker threads > cover, e.g. that this parameter controls the number of reader and > chunk verification threads. > > What tripped me over just now: > Is this intentionally not increasing the number of chunk verification > threads? Or was that overlooked? From the name of the parameter I > suspected this to act on both, reading and verifying. If this is not > the case, maybe the parameter should get renamed to a more telling > `parallel-chunk-readers` instead? I wasn't sure if the number of threads for verification should be controlled via this as well, as the original patch only added a new thread pool for reading, whereas the verification pool was already implemented. I pointed this out in the cover letter, though it might have been better to put this here too: The number of `worker-threads` only controls the thread pool for reading, though if it makes sense to reuse this for the verification pool, it could be adjusted to do so too. I think it makes sense to use it to control the number of threads of both. Thanks for the feedback, I'll adjust it along with the other proposed changes in a v2! > > further comment inline > On 11/5/25 4:51 PM, Nicolas Frey wrote: >> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >> --- >> src/api2/admin/datastore.rs | 13 +++++++++++-- >> src/api2/backup/environment.rs | 2 +- >> src/backup/verify.rs | 5 ++++- >> src/server/verify_job.rs | 3 ++- >> 4 files changed, 18 insertions(+), 5 deletions(-) >> >> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs >> index d192ee39..69a09081 100644 >> --- a/src/api2/admin/datastore.rs >> +++ b/src/api2/admin/datastore.rs >> @@ -677,6 +677,14 @@ pub async fn status( >> schema: NS_MAX_DEPTH_SCHEMA, >> optional: true, >> }, >> + "worker-threads": { >> + description: "Set the number of worker threads to >> use for the job", >> + type: Integer, >> + optional: true, >> + minimum: 1, >> + maximum: 32, >> + default: 1, >> + }, > > As mentioned on the pbs-api-types patch, this should reuse the same > schema as (will be) defined there, so this does not be to be re- > defined and stays in sync. > >> }, >> }, >> returns: { >> @@ -690,7 +698,7 @@ pub async fn status( >> )] >> /// Verify backups. >> /// >> -/// This function can verify a single backup snapshot, all backup >> from a backup group, >> +/// This function can verify a single backup snapshot, all backups >> from a backup group, >> /// or all backups in the datastore. >> #[allow(clippy::too_many_arguments)] >> pub fn verify( >> @@ -702,6 +710,7 @@ pub fn verify( >> ignore_verified: Option<bool>, >> outdated_after: Option<i64>, >> max_depth: Option<usize>, >> + worker_threads: Option<usize>, > > this could be a plain `usize` already, so it does not need to be > unwrapped for each parallel worker instantiation. The unwrapping and > setting to default can already happen in the constructor. > >> rpcenv: &mut dyn RpcEnvironment, >> ) -> Result<Value, Error> { >> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; >> @@ -781,7 +790,7 @@ pub fn verify( >> auth_id.to_string(), >> to_stdout, >> move |worker| { >> - let verify_worker = VerifyWorker::new(worker.clone(), >> datastore)?; >> + let verify_worker = VerifyWorker::new(worker.clone(), >> datastore, worker_threads)?; >> let failed_dirs = if let Some(backup_dir) = backup_dir { >> let mut res = Vec::new(); >> if !verify_worker.verify_backup_dir( >> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/ >> environment.rs >> index 0e8eab1b..5e6a73b9 100644 >> --- a/src/api2/backup/environment.rs >> +++ b/src/api2/backup/environment.rs >> @@ -812,7 +812,7 @@ impl BackupEnvironment { >> move |worker| { >> worker.log_message("Automatically verifying newly >> added snapshot"); >> - let verify_worker = >> VerifyWorker::new(worker.clone(), datastore)?; >> + let verify_worker = >> VerifyWorker::new(worker.clone(), datastore, None)?; >> if !verify_worker.verify_backup_dir_with_lock( >> &backup_dir, >> worker.upid().clone(), >> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >> index 7f91f38c..e11dba8e 100644 >> --- a/src/backup/verify.rs >> +++ b/src/backup/verify.rs >> @@ -32,6 +32,7 @@ pub struct VerifyWorker { >> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >> backend: DatastoreBackend, >> + worker_threads: Option<usize>, > > ... plain `usize` here > >> } >> impl VerifyWorker { >> @@ -39,6 +40,7 @@ impl VerifyWorker { >> pub fn new( >> worker: Arc<dyn WorkerTaskContext>, >> datastore: Arc<DataStore>, >> + worker_threads: Option<usize>, >> ) -> Result<Self, Error> { >> let backend = datastore.backend()?; >> Ok(Self { >> @@ -49,6 +51,7 @@ impl VerifyWorker { >> // start with 64 chunks since we assume there are few >> corrupt ones >> corrupt_chunks: >> Arc::new(Mutex::new(HashSet::with_capacity(64))), >> backend, >> + worker_threads, > > unwrap_or(4) here... or even define a constant for the default value, > although if it is placed here, it will only occur once. > >> }) >> } >> @@ -220,7 +223,7 @@ impl VerifyWorker { >> .datastore >> .get_chunks_in_order(&*index, skip_chunk, check_abort)?; >> - let reader_pool = ParallelHandler::new("read chunks", 4, { >> + let reader_pool = ParallelHandler::new("read chunks", >> self.worker_threads.unwrap_or(4), { >> let decoder_pool = decoder_pool.channel(); >> let datastore = Arc::clone(&self.datastore); >> let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs >> index c8792174..9d790b07 100644 >> --- a/src/server/verify_job.rs >> +++ b/src/server/verify_job.rs >> @@ -41,7 +41,8 @@ pub fn do_verification_job( >> None => Default::default(), >> }; >> - let verify_worker = VerifyWorker::new(worker.clone(), >> datastore)?; >> + let verify_worker = >> + VerifyWorker::new(worker.clone(), datastore, >> verification_job.worker_threads)?; >> let result = verify_worker.verify_all_backups( >> worker.upid(), >> ns, > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-06 9:23 ` Nicolas Frey @ 2025-11-06 9:32 ` Christian Ebner 2025-11-06 11:22 ` Nicolas Frey 0 siblings, 1 reply; 19+ messages in thread From: Christian Ebner @ 2025-11-06 9:32 UTC (permalink / raw) To: Nicolas Frey, Proxmox Backup Server development discussion On 11/6/25 10:22 AM, Nicolas Frey wrote: > On 11/6/25 10:08 AM, Christian Ebner wrote: >> Please add a short commit message describing what the worker threads >> cover, e.g. that this parameter controls the number of reader and >> chunk verification threads. >> >> What tripped me over just now: >> Is this intentionally not increasing the number of chunk verification >> threads? Or was that overlooked? From the name of the parameter I >> suspected this to act on both, reading and verifying. If this is not >> the case, maybe the parameter should get renamed to a more telling >> `parallel-chunk-readers` instead? > > I wasn't sure if the number of threads for verification should be > controlled via this as well, as the original patch only added a new > thread pool for reading, whereas the verification pool was already > implemented. > I pointed this out in the cover letter, though it might have been > better to put this here too: > > The number of `worker-threads` only controls the thread pool for > reading, though if it makes sense to reuse this for the verification > pool, it could be adjusted to do so too. > > I think it makes sense to use it to control the number of threads of > both. Thanks for the feedback, I'll adjust it along with the other > proposed changes in a v2! Well, that was just an uninformed assumption from my side when reading the parameter name (and I did not re-read the cover letter today after having looked at this quickly yesterday, sorry for that). But maybe you can also evaluate if it actually makes sense to control both by the same parameter, or if it only makes sense to e.g. increase the number of verification tasks (no point for that if the IO remains the bottleneck), or if it would make sense to have either 2 parameters or couple them by some proportionality constant. > > >> further comment inline >> On 11/5/25 4:51 PM, Nicolas Frey wrote: >>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >>> --- >>> src/api2/admin/datastore.rs | 13 +++++++++++-- >>> src/api2/backup/environment.rs | 2 +- >>> src/backup/verify.rs | 5 ++++- >>> src/server/verify_job.rs | 3 ++- >>> 4 files changed, 18 insertions(+), 5 deletions(-) >>> >>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs >>> index d192ee39..69a09081 100644 >>> --- a/src/api2/admin/datastore.rs >>> +++ b/src/api2/admin/datastore.rs >>> @@ -677,6 +677,14 @@ pub async fn status( >>> schema: NS_MAX_DEPTH_SCHEMA, >>> optional: true, >>> }, >>> + "worker-threads": { >>> + description: "Set the number of worker threads to >>> use for the job", >>> + type: Integer, >>> + optional: true, >>> + minimum: 1, >>> + maximum: 32, >>> + default: 1, >>> + }, >> >> As mentioned on the pbs-api-types patch, this should reuse the same >> schema as (will be) defined there, so this does not be to be re- >> defined and stays in sync. >> >>> }, >>> }, >>> returns: { >>> @@ -690,7 +698,7 @@ pub async fn status( >>> )] >>> /// Verify backups. >>> /// >>> -/// This function can verify a single backup snapshot, all backup >>> from a backup group, >>> +/// This function can verify a single backup snapshot, all backups >>> from a backup group, >>> /// or all backups in the datastore. >>> #[allow(clippy::too_many_arguments)] >>> pub fn verify( >>> @@ -702,6 +710,7 @@ pub fn verify( >>> ignore_verified: Option<bool>, >>> outdated_after: Option<i64>, >>> max_depth: Option<usize>, >>> + worker_threads: Option<usize>, >> >> this could be a plain `usize` already, so it does not need to be >> unwrapped for each parallel worker instantiation. The unwrapping and >> setting to default can already happen in the constructor. >> >>> rpcenv: &mut dyn RpcEnvironment, >>> ) -> Result<Value, Error> { >>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; >>> @@ -781,7 +790,7 @@ pub fn verify( >>> auth_id.to_string(), >>> to_stdout, >>> move |worker| { >>> - let verify_worker = VerifyWorker::new(worker.clone(), >>> datastore)?; >>> + let verify_worker = VerifyWorker::new(worker.clone(), >>> datastore, worker_threads)?; >>> let failed_dirs = if let Some(backup_dir) = backup_dir { >>> let mut res = Vec::new(); >>> if !verify_worker.verify_backup_dir( >>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/ >>> environment.rs >>> index 0e8eab1b..5e6a73b9 100644 >>> --- a/src/api2/backup/environment.rs >>> +++ b/src/api2/backup/environment.rs >>> @@ -812,7 +812,7 @@ impl BackupEnvironment { >>> move |worker| { >>> worker.log_message("Automatically verifying newly >>> added snapshot"); >>> - let verify_worker = >>> VerifyWorker::new(worker.clone(), datastore)?; >>> + let verify_worker = >>> VerifyWorker::new(worker.clone(), datastore, None)?; >>> if !verify_worker.verify_backup_dir_with_lock( >>> &backup_dir, >>> worker.upid().clone(), >>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >>> index 7f91f38c..e11dba8e 100644 >>> --- a/src/backup/verify.rs >>> +++ b/src/backup/verify.rs >>> @@ -32,6 +32,7 @@ pub struct VerifyWorker { >>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>> backend: DatastoreBackend, >>> + worker_threads: Option<usize>, >> >> ... plain `usize` here >> >>> } >>> impl VerifyWorker { >>> @@ -39,6 +40,7 @@ impl VerifyWorker { >>> pub fn new( >>> worker: Arc<dyn WorkerTaskContext>, >>> datastore: Arc<DataStore>, >>> + worker_threads: Option<usize>, >>> ) -> Result<Self, Error> { >>> let backend = datastore.backend()?; >>> Ok(Self { >>> @@ -49,6 +51,7 @@ impl VerifyWorker { >>> // start with 64 chunks since we assume there are few >>> corrupt ones >>> corrupt_chunks: >>> Arc::new(Mutex::new(HashSet::with_capacity(64))), >>> backend, >>> + worker_threads, >> >> unwrap_or(4) here... or even define a constant for the default value, >> although if it is placed here, it will only occur once. >> >>> }) >>> } >>> @@ -220,7 +223,7 @@ impl VerifyWorker { >>> .datastore >>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?; >>> - let reader_pool = ParallelHandler::new("read chunks", 4, { >>> + let reader_pool = ParallelHandler::new("read chunks", >>> self.worker_threads.unwrap_or(4), { >>> let decoder_pool = decoder_pool.channel(); >>> let datastore = Arc::clone(&self.datastore); >>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs >>> index c8792174..9d790b07 100644 >>> --- a/src/server/verify_job.rs >>> +++ b/src/server/verify_job.rs >>> @@ -41,7 +41,8 @@ pub fn do_verification_job( >>> None => Default::default(), >>> }; >>> - let verify_worker = VerifyWorker::new(worker.clone(), >>> datastore)?; >>> + let verify_worker = >>> + VerifyWorker::new(worker.clone(), datastore, >>> verification_job.worker_threads)?; >>> let result = verify_worker.verify_all_backups( >>> worker.upid(), >>> ns, >> > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-06 9:32 ` Christian Ebner @ 2025-11-06 11:22 ` Nicolas Frey 2025-11-06 11:57 ` Christian Ebner 0 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-06 11:22 UTC (permalink / raw) To: Christian Ebner, Proxmox Backup Server development discussion On 11/6/25 10:32 AM, Christian Ebner wrote: > On 11/6/25 10:22 AM, Nicolas Frey wrote: >> On 11/6/25 10:08 AM, Christian Ebner wrote: >>> Please add a short commit message describing what the worker threads >>> cover, e.g. that this parameter controls the number of reader and >>> chunk verification threads. >>> >>> What tripped me over just now: >>> Is this intentionally not increasing the number of chunk verification >>> threads? Or was that overlooked? From the name of the parameter I >>> suspected this to act on both, reading and verifying. If this is not >>> the case, maybe the parameter should get renamed to a more telling >>> `parallel-chunk-readers` instead? >> >> I wasn't sure if the number of threads for verification should be >> controlled via this as well, as the original patch only added a new >> thread pool for reading, whereas the verification pool was already >> implemented. >> I pointed this out in the cover letter, though it might have been >> better to put this here too: >> >> The number of `worker-threads` only controls the thread pool for >> reading, though if it makes sense to reuse this for the verification >> pool, it could be adjusted to do so too. >> >> I think it makes sense to use it to control the number of threads of >> both. Thanks for the feedback, I'll adjust it along with the other >> proposed changes in a v2! > > Well, that was just an uninformed assumption from my side when reading > the parameter name (and I did not re-read the cover letter today after > having looked at this quickly yesterday, sorry for that). That makes sense, the parameter name does not accurately describe the function it serves here anyway, so that should have been named a bit better. > > But maybe you can also evaluate if it actually makes sense to control > both by the same parameter, or if it only makes sense to e.g. increase > the number of verification tasks (no point for that if the IO remains > the bottleneck), or if it would make sense to have either 2 parameters > or couple them by some proportionality constant. > I had an idea along the lines of: self.worker_threads.mul(2).clamp(4, 32), though the proportionality factor should be tested to determine what would actually be sensible here and of course be documented accordingly. I also thought a minimum of 4 threads for verification makes sense, as when the default value of 1 thread is used, it has somewhat the same behavior as before adding the read thread pool (i.e. 1 thread for reading, 4 threads for verification) and would scale somewhat accordingly. The threads should also clamped to a max of 32 to respect the constraints of the schema also stating 32 as a max. What do you think? >> > >>> further comment inline >>> On 11/5/25 4:51 PM, Nicolas Frey wrote: >>>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >>>> --- >>>> src/api2/admin/datastore.rs | 13 +++++++++++-- >>>> src/api2/backup/environment.rs | 2 +- >>>> src/backup/verify.rs | 5 ++++- >>>> src/server/verify_job.rs | 3 ++- >>>> 4 files changed, 18 insertions(+), 5 deletions(-) >>>> >>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/ >>>> datastore.rs >>>> index d192ee39..69a09081 100644 >>>> --- a/src/api2/admin/datastore.rs >>>> +++ b/src/api2/admin/datastore.rs >>>> @@ -677,6 +677,14 @@ pub async fn status( >>>> schema: NS_MAX_DEPTH_SCHEMA, >>>> optional: true, >>>> }, >>>> + "worker-threads": { >>>> + description: "Set the number of worker threads to >>>> use for the job", >>>> + type: Integer, >>>> + optional: true, >>>> + minimum: 1, >>>> + maximum: 32, >>>> + default: 1, >>>> + }, >>> >>> As mentioned on the pbs-api-types patch, this should reuse the same >>> schema as (will be) defined there, so this does not be to be re- >>> defined and stays in sync. >>> >>>> }, >>>> }, >>>> returns: { >>>> @@ -690,7 +698,7 @@ pub async fn status( >>>> )] >>>> /// Verify backups. >>>> /// >>>> -/// This function can verify a single backup snapshot, all backup >>>> from a backup group, >>>> +/// This function can verify a single backup snapshot, all backups >>>> from a backup group, >>>> /// or all backups in the datastore. >>>> #[allow(clippy::too_many_arguments)] >>>> pub fn verify( >>>> @@ -702,6 +710,7 @@ pub fn verify( >>>> ignore_verified: Option<bool>, >>>> outdated_after: Option<i64>, >>>> max_depth: Option<usize>, >>>> + worker_threads: Option<usize>, >>> >>> this could be a plain `usize` already, so it does not need to be >>> unwrapped for each parallel worker instantiation. The unwrapping and >>> setting to default can already happen in the constructor. >>> >>>> rpcenv: &mut dyn RpcEnvironment, >>>> ) -> Result<Value, Error> { >>>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; >>>> @@ -781,7 +790,7 @@ pub fn verify( >>>> auth_id.to_string(), >>>> to_stdout, >>>> move |worker| { >>>> - let verify_worker = VerifyWorker::new(worker.clone(), >>>> datastore)?; >>>> + let verify_worker = VerifyWorker::new(worker.clone(), >>>> datastore, worker_threads)?; >>>> let failed_dirs = if let Some(backup_dir) = >>>> backup_dir { >>>> let mut res = Vec::new(); >>>> if !verify_worker.verify_backup_dir( >>>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/ >>>> environment.rs >>>> index 0e8eab1b..5e6a73b9 100644 >>>> --- a/src/api2/backup/environment.rs >>>> +++ b/src/api2/backup/environment.rs >>>> @@ -812,7 +812,7 @@ impl BackupEnvironment { >>>> move |worker| { >>>> worker.log_message("Automatically verifying newly >>>> added snapshot"); >>>> - let verify_worker = >>>> VerifyWorker::new(worker.clone(), datastore)?; >>>> + let verify_worker = >>>> VerifyWorker::new(worker.clone(), datastore, None)?; >>>> if !verify_worker.verify_backup_dir_with_lock( >>>> &backup_dir, >>>> worker.upid().clone(), >>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs >>>> index 7f91f38c..e11dba8e 100644 >>>> --- a/src/backup/verify.rs >>>> +++ b/src/backup/verify.rs >>>> @@ -32,6 +32,7 @@ pub struct VerifyWorker { >>>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>, >>>> backend: DatastoreBackend, >>>> + worker_threads: Option<usize>, >>> >>> ... plain `usize` here >>> >>>> } >>>> impl VerifyWorker { >>>> @@ -39,6 +40,7 @@ impl VerifyWorker { >>>> pub fn new( >>>> worker: Arc<dyn WorkerTaskContext>, >>>> datastore: Arc<DataStore>, >>>> + worker_threads: Option<usize>, >>>> ) -> Result<Self, Error> { >>>> let backend = datastore.backend()?; >>>> Ok(Self { >>>> @@ -49,6 +51,7 @@ impl VerifyWorker { >>>> // start with 64 chunks since we assume there are few >>>> corrupt ones >>>> corrupt_chunks: >>>> Arc::new(Mutex::new(HashSet::with_capacity(64))), >>>> backend, >>>> + worker_threads, >>> >>> unwrap_or(4) here... or even define a constant for the default value, >>> although if it is placed here, it will only occur once. >>> >>>> }) >>>> } >>>> @@ -220,7 +223,7 @@ impl VerifyWorker { >>>> .datastore >>>> .get_chunks_in_order(&*index, skip_chunk, >>>> check_abort)?; >>>> - let reader_pool = ParallelHandler::new("read chunks", >>>> 4, { >>>> + let reader_pool = ParallelHandler::new("read chunks", >>>> self.worker_threads.unwrap_or(4), { >>>> let decoder_pool = decoder_pool.channel(); >>>> let datastore = Arc::clone(&self.datastore); >>>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks); >>>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs >>>> index c8792174..9d790b07 100644 >>>> --- a/src/server/verify_job.rs >>>> +++ b/src/server/verify_job.rs >>>> @@ -41,7 +41,8 @@ pub fn do_verification_job( >>>> None => Default::default(), >>>> }; >>>> - let verify_worker = VerifyWorker::new(worker.clone(), >>>> datastore)?; >>>> + let verify_worker = >>>> + VerifyWorker::new(worker.clone(), datastore, >>>> verification_job.worker_threads)?; >>>> let result = verify_worker.verify_all_backups( >>>> worker.upid(), >>>> ns, >>> >> > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use 2025-11-06 11:22 ` Nicolas Frey @ 2025-11-06 11:57 ` Christian Ebner 0 siblings, 0 replies; 19+ messages in thread From: Christian Ebner @ 2025-11-06 11:57 UTC (permalink / raw) To: Nicolas Frey, Proxmox Backup Server development discussion On 11/6/25 12:21 PM, Nicolas Frey wrote: > On 11/6/25 10:32 AM, Christian Ebner wrote: >> On 11/6/25 10:22 AM, Nicolas Frey wrote: >>> On 11/6/25 10:08 AM, Christian Ebner wrote: >>>> Please add a short commit message describing what the worker threads >>>> cover, e.g. that this parameter controls the number of reader and >>>> chunk verification threads. >>>> >>>> What tripped me over just now: >>>> Is this intentionally not increasing the number of chunk verification >>>> threads? Or was that overlooked? From the name of the parameter I >>>> suspected this to act on both, reading and verifying. If this is not >>>> the case, maybe the parameter should get renamed to a more telling >>>> `parallel-chunk-readers` instead? >>> >>> I wasn't sure if the number of threads for verification should be >>> controlled via this as well, as the original patch only added a new >>> thread pool for reading, whereas the verification pool was already >>> implemented. >>> I pointed this out in the cover letter, though it might have been >>> better to put this here too: >>> >>> The number of `worker-threads` only controls the thread pool for >>> reading, though if it makes sense to reuse this for the verification >>> pool, it could be adjusted to do so too. >>> >>> I think it makes sense to use it to control the number of threads of >>> both. Thanks for the feedback, I'll adjust it along with the other >>> proposed changes in a v2! >> >> Well, that was just an uninformed assumption from my side when reading >> the parameter name (and I did not re-read the cover letter today after >> having looked at this quickly yesterday, sorry for that). > > That makes sense, the parameter name does not accurately describe the > function it serves here anyway, so that should have been named a bit > better. > >> >> But maybe you can also evaluate if it actually makes sense to control >> both by the same parameter, or if it only makes sense to e.g. increase >> the number of verification tasks (no point for that if the IO remains >> the bottleneck), or if it would make sense to have either 2 parameters >> or couple them by some proportionality constant. >> > > I had an idea along the lines of: > > self.worker_threads.mul(2).clamp(4, 32), On second thought, this will most likely not cover most cases? One system could be severely IO bound, the other one severely CPU bound... > though the proportionality factor should be tested to determine what > would actually be sensible here and of course be documented accordingly. > > I also thought a minimum of 4 threads for verification makes sense, as > when the default value of 1 thread is used, it has somewhat the same > behavior as before adding the read thread pool (i.e. 1 thread for > reading, 4 threads for verification) and would scale somewhat > accordingly. The threads should also clamped to a max of 32 to respect > the constraints of the schema also stating 32 as a max. > > What do you think? I think it would make sense to keep both decoupled for the time being, especially since this might depend strongly on the backend. E.g. for S3 backed datastores you might gain a lot by increasing the number of readers, but not much by increasing the number of verify threads. _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey ` (2 preceding siblings ...) 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey @ 2025-11-05 15:51 ` Nicolas Frey 2025-11-06 9:13 ` Christian Ebner 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey 2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner 5 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw) To: pbs-devel Signed-off-by: Nicolas Frey <n.frey@proxmox.com> --- src/api2/config/verify.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs index e71e0c2e..2847c984 100644 --- a/src/api2/config/verify.rs +++ b/src/api2/config/verify.rs @@ -149,6 +149,8 @@ pub enum DeletableProperty { Ns, /// Delete max-depth property, defaulting to full recursion again MaxDepth, + /// Delete worker-threads property + WorkerThreads, } #[api( @@ -229,6 +231,9 @@ pub fn update_verification_job( DeletableProperty::MaxDepth => { data.max_depth = None; } + DeletableProperty::WorkerThreads => { + data.worker_threads = None; + } } } } @@ -266,6 +271,9 @@ pub fn update_verification_job( data.max_depth = Some(max_depth); } } + if update.worker_threads.is_some() { + data.worker_threads = update.worker_threads; + } // check new store and NS user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?; -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey @ 2025-11-06 9:13 ` Christian Ebner 0 siblings, 0 replies; 19+ messages in thread From: Christian Ebner @ 2025-11-06 9:13 UTC (permalink / raw) To: pbs-devel Again a short commit message would be nice, e.g. Allows to add, update or delete the worker-thread property in the verify job config via the API. This will control the number of workers to read (and verify?) chunks in the verification job. On 11/5/25 4:51 PM, Nicolas Frey wrote: > Signed-off-by: Nicolas Frey <n.frey@proxmox.com> > --- > src/api2/config/verify.rs | 8 ++++++++ > 1 file changed, 8 insertions(+) > > diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs > index e71e0c2e..2847c984 100644 > --- a/src/api2/config/verify.rs > +++ b/src/api2/config/verify.rs > @@ -149,6 +149,8 @@ pub enum DeletableProperty { > Ns, > /// Delete max-depth property, defaulting to full recursion again > MaxDepth, > + /// Delete worker-threads property > + WorkerThreads, > } > > #[api( > @@ -229,6 +231,9 @@ pub fn update_verification_job( > DeletableProperty::MaxDepth => { > data.max_depth = None; > } > + DeletableProperty::WorkerThreads => { > + data.worker_threads = None; > + } > } > } > } > @@ -266,6 +271,9 @@ pub fn update_verification_job( > data.max_depth = Some(max_depth); > } > } > + if update.worker_threads.is_some() { > + data.worker_threads = update.worker_threads; > + } > > // check new store and NS > user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?; _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey ` (3 preceding siblings ...) 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey @ 2025-11-05 15:51 ` Nicolas Frey 2025-11-06 9:22 ` Christian Ebner 2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner 5 siblings, 1 reply; 19+ messages in thread From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw) To: pbs-devel Signed-off-by: Nicolas Frey <n.frey@proxmox.com> --- www/window/VerifyAll.js | 12 ++++++++++++ www/window/VerifyJobEdit.js | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js index 01bcd63d..82f62aae 100644 --- a/www/window/VerifyAll.js +++ b/www/window/VerifyAll.js @@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', { }, ], }, + + { + xtype: 'proxmoxintegerfield', + name: 'worker-threads', + fieldLabel: gettext('# of Worker Threads'), + emptyText: '1', + minValue: 1, + maxValue: 32, + cbind: { + deleteEmpty: '{!isCreate}', + }, + }, ], }, ], diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js index e87ca069..7b7a96c4 100644 --- a/www/window/VerifyJobEdit.js +++ b/www/window/VerifyJobEdit.js @@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', { }, }, ], + advancedColumn2: [ + { + xtype: 'proxmoxintegerfield', + name: 'worker-threads', + fieldLabel: gettext('# of Worker Threads'), + emptyText: '1', + minValue: 1, + maxValue: 32, + cbind: { + deleteEmpty: '{!isCreate}', + }, + }, + ] }, }); -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey @ 2025-11-06 9:22 ` Christian Ebner 2025-11-06 9:25 ` Nicolas Frey 0 siblings, 1 reply; 19+ messages in thread From: Christian Ebner @ 2025-11-06 9:22 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Nicolas Frey Same comment with respect to commit title as for some of the other patches, a short description on why this is done would be nice. E.g. Exposes the worker-threads property in the verification job config window and verify all window so the user can update the values accordingly. one comment below On 11/5/25 4:51 PM, Nicolas Frey wrote: > Signed-off-by: Nicolas Frey <n.frey@proxmox.com> > --- > www/window/VerifyAll.js | 12 ++++++++++++ > www/window/VerifyJobEdit.js | 13 +++++++++++++ > 2 files changed, 25 insertions(+) > > diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js > index 01bcd63d..82f62aae 100644 > --- a/www/window/VerifyAll.js > +++ b/www/window/VerifyAll.js > @@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', { > }, > ], > }, > + > + { > + xtype: 'proxmoxintegerfield', > + name: 'worker-threads', > + fieldLabel: gettext('# of Worker Threads'), > + emptyText: '1', > + minValue: 1, > + maxValue: 32, > + cbind: { > + deleteEmpty: '{!isCreate}', > + }, > + }, > ], > }, > ], > diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js > index e87ca069..7b7a96c4 100644 > --- a/www/window/VerifyJobEdit.js > +++ b/www/window/VerifyJobEdit.js > @@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', { > }, > }, > ], > + advancedColumn2: [ > + { > + xtype: 'proxmoxintegerfield', > + name: 'worker-threads', > + fieldLabel: gettext('# of Worker Threads'), > + emptyText: '1', > + minValue: 1, > + maxValue: 32, > + cbind: { > + deleteEmpty: '{!isCreate}', > + }, > + }, > + ] it would be nice to also delete the value again if set to the default. Further, just noticed: this now seemingly defaults to 1, which is also the default for the API schema, but the parallel task instantiation gets a default of 4 if the value is not set and therefore unwrapped? That should be made consistent. > }, > }); _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job 2025-11-06 9:22 ` Christian Ebner @ 2025-11-06 9:25 ` Nicolas Frey 0 siblings, 0 replies; 19+ messages in thread From: Nicolas Frey @ 2025-11-06 9:25 UTC (permalink / raw) To: Christian Ebner, Proxmox Backup Server development discussion On 11/6/25 10:21 AM, Christian Ebner wrote: > Same comment with respect to commit title as for some of the other > patches, a short description on why this is done would be nice. E.g. > > Exposes the worker-threads property in the verification job config > window and verify all window so the user can update the values > accordingly. > > one comment below > > On 11/5/25 4:51 PM, Nicolas Frey wrote: >> Signed-off-by: Nicolas Frey <n.frey@proxmox.com> >> --- >> www/window/VerifyAll.js | 12 ++++++++++++ >> www/window/VerifyJobEdit.js | 13 +++++++++++++ >> 2 files changed, 25 insertions(+) >> >> diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js >> index 01bcd63d..82f62aae 100644 >> --- a/www/window/VerifyAll.js >> +++ b/www/window/VerifyAll.js >> @@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', { >> }, >> ], >> }, >> + >> + { >> + xtype: 'proxmoxintegerfield', >> + name: 'worker-threads', >> + fieldLabel: gettext('# of Worker Threads'), >> + emptyText: '1', >> + minValue: 1, >> + maxValue: 32, >> + cbind: { >> + deleteEmpty: '{!isCreate}', >> + }, >> + }, >> ], >> }, >> ], >> diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js >> index e87ca069..7b7a96c4 100644 >> --- a/www/window/VerifyJobEdit.js >> +++ b/www/window/VerifyJobEdit.js >> @@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', { >> }, >> }, >> ], >> + advancedColumn2: [ >> + { >> + xtype: 'proxmoxintegerfield', >> + name: 'worker-threads', >> + fieldLabel: gettext('# of Worker Threads'), >> + emptyText: '1', >> + minValue: 1, >> + maxValue: 32, >> + cbind: { >> + deleteEmpty: '{!isCreate}', >> + }, >> + }, >> + ] > > it would be nice to also delete the value again if set to the default. > > Further, just noticed: this now seemingly defaults to 1, which is also > the default for the API schema, but the parallel task instantiation > gets a default of 4 if the value is not set and therefore unwrapped? > That should be made consistent. > Ah sorry, that was an oversight. This was a copy paste error, I reused this code from the tape backup >> }, >> }); > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification 2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey ` (4 preceding siblings ...) 2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey @ 2025-11-06 8:02 ` Christian Ebner 5 siblings, 0 replies; 19+ messages in thread From: Christian Ebner @ 2025-11-06 8:02 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Nicolas Frey On 11/5/25 4:51 PM, Nicolas Frey wrote: > This patch series aims to expand on Dominik's series [0] written for > pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate > thread pool from the verification. > > The number of threads was previously hard-coded, but is now > configurable via the API and GUI with a new property called > `worker-threads`, similarly to tape backups. > > The number of `worker-threads` only controls the thread pool for > reading, though if it makes sense to reuse this for the verification > pool, it could be adjusted to do so too. > > In my local tests I measured the following speed difference: > verified a single snapshot with ~32 GiB (4x the RAM size) with 4 cores > > 1 thread: ~440MiB/s > 2 threads: ~780MiB/s > 4 threads: ~1140MiB/s > > [0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t Thanks for the patches, nice work! I will go trough and comment on individual patches, but one thing upfront: Could you also include a concise description of what the additional parameter for the verification job config controlles to the docs? So we can point users directly to that. _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 19+ messages in thread
end of thread, other threads:[~2025-11-06 11:57 UTC | newest]
Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
2025-11-06 8:14 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-06 8:54 ` Christian Ebner
2025-11-06 9:04 ` Nicolas Frey
2025-11-06 9:26 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey
2025-11-06 9:09 ` Christian Ebner
2025-11-06 9:23 ` Nicolas Frey
2025-11-06 9:32 ` Christian Ebner
2025-11-06 11:22 ` Nicolas Frey
2025-11-06 11:57 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey
2025-11-06 9:13 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-06 9:22 ` Christian Ebner
2025-11-06 9:25 ` Nicolas Frey
2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.