* [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification
@ 2025-11-05 15:51 Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
` (5 more replies)
0 siblings, 6 replies; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
This patch series aims to expand on Dominik's series [0] written for
pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
thread pool from the verification.
The number of threads was previously hard-coded, but is now
configurable via the API and GUI with a new property called
`worker-threads`, similarly to tape backups.
The number of `worker-threads` only controls the thread pool for
reading, though if it makes sense to reuse this for the verification
pool, it could be adjusted to do so too.
In my local tests I measured the following speed difference:
verified a single snapshot with ~32 GiB (4x the RAM size) with 4 cores
1 thread: ~440MiB/s
2 threads: ~780MiB/s
4 threads: ~1140MiB/s
[0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t
proxmox:
Nicolas Frey (1):
pbs-api-types: jobs: verify: add worker-threads to
VerificationJobSetup
pbs-api-types/src/jobs.rs | 10 ++++++++++
1 file changed, 10 insertions(+)
proxmox-backup:
Nicolas Frey (4):
api: verify: move chunk loading into parallel handler
api: verify: use worker-threads to determine the number of threads to
use
api: verify: add worker-threads to update endpoint
ui: verify: add option to set number of threads for job
src/api2/admin/datastore.rs | 13 +++-
src/api2/backup/environment.rs | 2 +-
src/api2/config/verify.rs | 8 +++
src/backup/verify.rs | 123 +++++++++++++++++++++------------
src/server/verify_job.rs | 3 +-
www/window/VerifyAll.js | 12 ++++
www/window/VerifyJobEdit.js | 13 ++++
7 files changed, 125 insertions(+), 49 deletions(-)
Summary over all repositories:
8 files changed, 135 insertions(+), 49 deletions(-)
--
Generated by git-murpp 0.8.1
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
@ 2025-11-05 15:51 ` Nicolas Frey
2025-11-06 8:14 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey
` (4 subsequent siblings)
5 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
pbs-api-types/src/jobs.rs | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 4dbbef2b..d904f797 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -203,6 +203,13 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema =
optional: true,
schema: crate::NS_MAX_DEPTH_SCHEMA,
},
+ "worker-threads": {
+ type: Integer,
+ optional: true,
+ minimum: 1,
+ maximum: 32,
+ default: 1,
+ },
}
)]
#[derive(Serialize, Deserialize, Updater, Clone, PartialEq)]
@@ -221,6 +228,9 @@ pub struct VerificationJobConfig {
#[serde(skip_serializing_if = "Option::is_none")]
/// Reverify snapshots after X days, never if 0. Ignored if 'ignore_verified' is false.
pub outdated_after: Option<i64>,
+ /// Set the number of worker threads to use for the job
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub worker_threads: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
@ 2025-11-05 15:51 ` Nicolas Frey
2025-11-06 8:54 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey
` (3 subsequent siblings)
5 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
This way, the chunks will be loaded in parallel in addition to being
checked in parallel.
Depending on the underlying storage, this can speed up reading chunks
from disk, especially when the underlying storage is IO depth
dependent, and the CPU is faster than the storage.
Originally-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
src/backup/verify.rs | 120 +++++++++++++++++++++++++++----------------
1 file changed, 75 insertions(+), 45 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index e33fdf50..7f91f38c 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -1,6 +1,6 @@
use pbs_config::BackupLockGuard;
use std::collections::HashSet;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
@@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
-use crate::tools::parallel_handler::ParallelHandler;
+use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
use crate::backup::hierarchy::ListAccessibleBackupGroups;
@@ -156,23 +156,20 @@ impl VerifyWorker {
let start_time = Instant::now();
- let mut read_bytes = 0;
- let mut decoded_bytes = 0;
+ let read_bytes = Arc::new(AtomicU64::new(0));
+ let decoded_bytes = Arc::new(AtomicU64::new(0));
- let datastore2 = Arc::clone(&self.datastore);
- let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
- let verified_chunks2 = Arc::clone(&self.verified_chunks);
- let errors2 = Arc::clone(&errors);
-
- let decoder_pool = ParallelHandler::new(
- "verify chunk decoder",
- 4,
+ let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
+ let datastore = Arc::clone(&self.datastore);
+ let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
+ let verified_chunks = Arc::clone(&self.verified_chunks);
+ let errors = Arc::clone(&errors);
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
- corrupt_chunks2.lock().unwrap().insert(digest);
+ corrupt_chunks.lock().unwrap().insert(digest);
info!("can't verify chunk, unknown CryptMode - {err}");
- errors2.fetch_add(1, Ordering::SeqCst);
+ errors.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Ok(mode) => mode,
@@ -182,21 +179,21 @@ impl VerifyWorker {
info!(
"chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
);
- errors2.fetch_add(1, Ordering::SeqCst);
+ errors.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
- corrupt_chunks2.lock().unwrap().insert(digest);
+ corrupt_chunks.lock().unwrap().insert(digest);
info!("{err}");
- errors2.fetch_add(1, Ordering::SeqCst);
- Self::rename_corrupted_chunk(datastore2.clone(), &digest);
+ errors.fetch_add(1, Ordering::SeqCst);
+ Self::rename_corrupted_chunk(datastore.clone(), &digest);
} else {
- verified_chunks2.lock().unwrap().insert(digest);
+ verified_chunks.lock().unwrap().insert(digest);
}
Ok(())
- },
- );
+ }
+ });
let skip_chunk = |digest: &[u8; 32]| -> bool {
if self.verified_chunks.lock().unwrap().contains(digest) {
@@ -223,6 +220,29 @@ impl VerifyWorker {
.datastore
.get_chunks_in_order(&*index, skip_chunk, check_abort)?;
+ let reader_pool = ParallelHandler::new("read chunks", 4, {
+ let decoder_pool = decoder_pool.channel();
+ let datastore = Arc::clone(&self.datastore);
+ let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
+ let read_bytes = Arc::clone(&read_bytes);
+ let decoded_bytes = Arc::clone(&decoded_bytes);
+ let errors = Arc::clone(&errors);
+ let backend = self.backend.clone();
+
+ move |info: ChunkReadInfo| {
+ Self::verify_chunk_by_backend(
+ &backend,
+ Arc::clone(&datastore),
+ Arc::clone(&corrupt_chunks),
+ Arc::clone(&read_bytes),
+ Arc::clone(&decoded_bytes),
+ Arc::clone(&errors),
+ &decoder_pool,
+ &info,
+ )
+ }
+ });
+
for (pos, _) in chunk_list {
self.worker.check_abort()?;
self.worker.fail_on_shutdown()?;
@@ -234,19 +254,16 @@ impl VerifyWorker {
continue; // already verified or marked corrupt
}
- self.verify_chunk_by_backend(
- &info,
- &mut read_bytes,
- &mut decoded_bytes,
- Arc::clone(&errors),
- &decoder_pool,
- )?;
+ reader_pool.send(info)?;
}
- decoder_pool.complete()?;
+ reader_pool.complete()?;
let elapsed = start_time.elapsed().as_secs_f64();
+ let read_bytes = read_bytes.load(Ordering::SeqCst);
+ let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+
let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -266,26 +283,31 @@ impl VerifyWorker {
Ok(())
}
+ #[allow(clippy::too_many_arguments)]
fn verify_chunk_by_backend(
- &self,
- info: &ChunkReadInfo,
- read_bytes: &mut u64,
- decoded_bytes: &mut u64,
+ backend: &DatastoreBackend,
+ datastore: Arc<DataStore>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ read_bytes: Arc<AtomicU64>,
+ decoded_bytes: Arc<AtomicU64>,
errors: Arc<AtomicUsize>,
- decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
+ decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
+ info: &ChunkReadInfo,
) -> Result<(), Error> {
- match &self.backend {
- DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
- Err(err) => self.add_corrupt_chunk(
+ match backend {
+ DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
+ Err(err) => Self::add_corrupt_chunk(
+ datastore,
+ corrupt_chunks,
info.digest,
errors,
&format!("can't verify chunk, load failed - {err}"),
),
Ok(chunk) => {
let size = info.size();
- *read_bytes += chunk.raw_size();
+ read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- *decoded_bytes += size;
+ decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
},
DatastoreBackend::S3(s3_client) => {
@@ -302,9 +324,9 @@ impl VerifyWorker {
match chunk_result {
Ok(chunk) => {
let size = info.size();
- *read_bytes += chunk.raw_size();
+ read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- *decoded_bytes += size;
+ decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
Err(err) => {
errors.fetch_add(1, Ordering::SeqCst);
@@ -312,7 +334,9 @@ impl VerifyWorker {
}
}
}
- Ok(None) => self.add_corrupt_chunk(
+ Ok(None) => Self::add_corrupt_chunk(
+ datastore,
+ corrupt_chunks,
info.digest,
errors,
&format!(
@@ -330,13 +354,19 @@ impl VerifyWorker {
Ok(())
}
- fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+ fn add_corrupt_chunk(
+ datastore: Arc<DataStore>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ digest: [u8; 32],
+ errors: Arc<AtomicUsize>,
+ message: &str,
+ ) {
// Panic on poisoned mutex
- let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
+ let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
corrupt_chunks.insert(digest);
error!(message);
errors.fetch_add(1, Ordering::SeqCst);
- Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
+ Self::rename_corrupted_chunk(datastore.clone(), &digest);
}
fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey
@ 2025-11-05 15:51 ` Nicolas Frey
2025-11-06 9:09 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey
` (2 subsequent siblings)
5 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
src/api2/admin/datastore.rs | 13 +++++++++++--
src/api2/backup/environment.rs | 2 +-
src/backup/verify.rs | 5 ++++-
src/server/verify_job.rs | 3 ++-
4 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index d192ee39..69a09081 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -677,6 +677,14 @@ pub async fn status(
schema: NS_MAX_DEPTH_SCHEMA,
optional: true,
},
+ "worker-threads": {
+ description: "Set the number of worker threads to use for the job",
+ type: Integer,
+ optional: true,
+ minimum: 1,
+ maximum: 32,
+ default: 1,
+ },
},
},
returns: {
@@ -690,7 +698,7 @@ pub async fn status(
)]
/// Verify backups.
///
-/// This function can verify a single backup snapshot, all backup from a backup group,
+/// This function can verify a single backup snapshot, all backups from a backup group,
/// or all backups in the datastore.
#[allow(clippy::too_many_arguments)]
pub fn verify(
@@ -702,6 +710,7 @@ pub fn verify(
ignore_verified: Option<bool>,
outdated_after: Option<i64>,
max_depth: Option<usize>,
+ worker_threads: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -781,7 +790,7 @@ pub fn verify(
auth_id.to_string(),
to_stdout,
move |worker| {
- let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+ let verify_worker = VerifyWorker::new(worker.clone(), datastore, worker_threads)?;
let failed_dirs = if let Some(backup_dir) = backup_dir {
let mut res = Vec::new();
if !verify_worker.verify_backup_dir(
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 0e8eab1b..5e6a73b9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -812,7 +812,7 @@ impl BackupEnvironment {
move |worker| {
worker.log_message("Automatically verifying newly added snapshot");
- let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+ let verify_worker = VerifyWorker::new(worker.clone(), datastore, None)?;
if !verify_worker.verify_backup_dir_with_lock(
&backup_dir,
worker.upid().clone(),
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 7f91f38c..e11dba8e 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -32,6 +32,7 @@ pub struct VerifyWorker {
verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
backend: DatastoreBackend,
+ worker_threads: Option<usize>,
}
impl VerifyWorker {
@@ -39,6 +40,7 @@ impl VerifyWorker {
pub fn new(
worker: Arc<dyn WorkerTaskContext>,
datastore: Arc<DataStore>,
+ worker_threads: Option<usize>,
) -> Result<Self, Error> {
let backend = datastore.backend()?;
Ok(Self {
@@ -49,6 +51,7 @@ impl VerifyWorker {
// start with 64 chunks since we assume there are few corrupt ones
corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
backend,
+ worker_threads,
})
}
@@ -220,7 +223,7 @@ impl VerifyWorker {
.datastore
.get_chunks_in_order(&*index, skip_chunk, check_abort)?;
- let reader_pool = ParallelHandler::new("read chunks", 4, {
+ let reader_pool = ParallelHandler::new("read chunks", self.worker_threads.unwrap_or(4), {
let decoder_pool = decoder_pool.channel();
let datastore = Arc::clone(&self.datastore);
let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index c8792174..9d790b07 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -41,7 +41,8 @@ pub fn do_verification_job(
None => Default::default(),
};
- let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
+ let verify_worker =
+ VerifyWorker::new(worker.clone(), datastore, verification_job.worker_threads)?;
let result = verify_worker.verify_all_backups(
worker.upid(),
ns,
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
` (2 preceding siblings ...)
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey
@ 2025-11-05 15:51 ` Nicolas Frey
2025-11-06 9:13 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner
5 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
src/api2/config/verify.rs | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs
index e71e0c2e..2847c984 100644
--- a/src/api2/config/verify.rs
+++ b/src/api2/config/verify.rs
@@ -149,6 +149,8 @@ pub enum DeletableProperty {
Ns,
/// Delete max-depth property, defaulting to full recursion again
MaxDepth,
+ /// Delete worker-threads property
+ WorkerThreads,
}
#[api(
@@ -229,6 +231,9 @@ pub fn update_verification_job(
DeletableProperty::MaxDepth => {
data.max_depth = None;
}
+ DeletableProperty::WorkerThreads => {
+ data.worker_threads = None;
+ }
}
}
}
@@ -266,6 +271,9 @@ pub fn update_verification_job(
data.max_depth = Some(max_depth);
}
}
+ if update.worker_threads.is_some() {
+ data.worker_threads = update.worker_threads;
+ }
// check new store and NS
user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?;
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
` (3 preceding siblings ...)
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey
@ 2025-11-05 15:51 ` Nicolas Frey
2025-11-06 9:22 ` Christian Ebner
2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner
5 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-05 15:51 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
www/window/VerifyAll.js | 12 ++++++++++++
www/window/VerifyJobEdit.js | 13 +++++++++++++
2 files changed, 25 insertions(+)
diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js
index 01bcd63d..82f62aae 100644
--- a/www/window/VerifyAll.js
+++ b/www/window/VerifyAll.js
@@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', {
},
],
},
+
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Worker Threads'),
+ emptyText: '1',
+ minValue: 1,
+ maxValue: 32,
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ },
],
},
],
diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js
index e87ca069..7b7a96c4 100644
--- a/www/window/VerifyJobEdit.js
+++ b/www/window/VerifyJobEdit.js
@@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', {
},
},
],
+ advancedColumn2: [
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Worker Threads'),
+ emptyText: '1',
+ minValue: 1,
+ maxValue: 32,
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ },
+ ]
},
});
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
` (4 preceding siblings ...)
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey
@ 2025-11-06 8:02 ` Christian Ebner
5 siblings, 0 replies; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 8:02 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Nicolas Frey
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> This patch series aims to expand on Dominik's series [0] written for
> pbs 3, parallelizing chunk reads in `VerifyWorker` using a seperate
> thread pool from the verification.
>
> The number of threads was previously hard-coded, but is now
> configurable via the API and GUI with a new property called
> `worker-threads`, similarly to tape backups.
>
> The number of `worker-threads` only controls the thread pool for
> reading, though if it makes sense to reuse this for the verification
> pool, it could be adjusted to do so too.
>
> In my local tests I measured the following speed difference:
> verified a single snapshot with ~32 GiB (4x the RAM size) with 4 cores
>
> 1 thread: ~440MiB/s
> 2 threads: ~780MiB/s
> 4 threads: ~1140MiB/s
>
> [0] https://lore.proxmox.com/pbs-devel/20250707132706.2854973-1-d.csapak@proxmox.com/#t
Thanks for the patches, nice work!
I will go trough and comment on individual patches, but one thing upfront:
Could you also include a concise description of what the additional
parameter for the verification job config controlles to the docs? So we
can point users directly to that.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
@ 2025-11-06 8:14 ` Christian Ebner
0 siblings, 0 replies; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 8:14 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Nicolas Frey
Please mention in a short commit message what this new parameter will be
used for, e.g. controls the number of parallel workers to read and
verify chunks.
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> pbs-api-types/src/jobs.rs | 10 ++++++++++
> 1 file changed, 10 insertions(+)
>
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index 4dbbef2b..d904f797 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -203,6 +203,13 @@ pub const VERIFICATION_OUTDATED_AFTER_SCHEMA: Schema =
> optional: true,
> schema: crate::NS_MAX_DEPTH_SCHEMA,
> },
> + "worker-threads": {
> + type: Integer,
> + optional: true,
> + minimum: 1,
> + maximum: 32,
> + default: 1,
> + },
Please define this as dedicated schema, so it can be reused for the PBS
api as well (as exposed in patch 2).
> }
> )]
> #[derive(Serialize, Deserialize, Updater, Clone, PartialEq)]
> @@ -221,6 +228,9 @@ pub struct VerificationJobConfig {
> #[serde(skip_serializing_if = "Option::is_none")]
> /// Reverify snapshots after X days, never if 0. Ignored if 'ignore_verified' is false.
> pub outdated_after: Option<i64>,
> + /// Set the number of worker threads to use for the job
> + #[serde(skip_serializing_if = "Option::is_none")]
> + pub worker_threads: Option<usize>,
> #[serde(skip_serializing_if = "Option::is_none")]
> pub comment: Option<String>,
> #[serde(skip_serializing_if = "Option::is_none")]
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey
@ 2025-11-06 8:54 ` Christian Ebner
2025-11-06 9:04 ` Nicolas Frey
0 siblings, 1 reply; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 8:54 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Nicolas Frey
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> This way, the chunks will be loaded in parallel in addition to being
> checked in parallel.
>
> Depending on the underlying storage, this can speed up reading chunks
> from disk, especially when the underlying storage is IO depth
> dependent, and the CPU is faster than the storage.
>
> Originally-by: Dominik Csapak <d.csapak@proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> src/backup/verify.rs | 120 +++++++++++++++++++++++++++----------------
> 1 file changed, 75 insertions(+), 45 deletions(-)
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index e33fdf50..7f91f38c 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -1,6 +1,6 @@
> use pbs_config::BackupLockGuard;
> use std::collections::HashSet;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::Instant;
>
> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
> use pbs_datastore::manifest::{BackupManifest, FileInfo};
> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
>
> -use crate::tools::parallel_handler::ParallelHandler;
> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
>
> use crate::backup::hierarchy::ListAccessibleBackupGroups;
>
> @@ -156,23 +156,20 @@ impl VerifyWorker {
>
> let start_time = Instant::now();
>
> - let mut read_bytes = 0;
> - let mut decoded_bytes = 0;
> + let read_bytes = Arc::new(AtomicU64::new(0));
> + let decoded_bytes = Arc::new(AtomicU64::new(0));
>
> - let datastore2 = Arc::clone(&self.datastore);
> - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
> - let verified_chunks2 = Arc::clone(&self.verified_chunks);
> - let errors2 = Arc::clone(&errors);
> -
> - let decoder_pool = ParallelHandler::new(
> - "verify chunk decoder",
> - 4,
> + let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> + let datastore = Arc::clone(&self.datastore);
> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> + let verified_chunks = Arc::clone(&self.verified_chunks);
> + let errors = Arc::clone(&errors);
Hmm, this really warrants an internal state struct for bundling theses
and only cloning the arc around that.
E.g. I would propose to introduce a struct along the lines of:
struct IndexVerifyState {
read_bytes: AtomicU64,
decoded_bytes: AtomicU64,
errors: AtomicUsize,
datastore: Arc<DataStore>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
start_time: Instant,
}
Which can then be wrapped in an Arc and also passed along as parameter
to the inner methods where required.
See the diff below (untested), which could be used for further refining
this.
> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
> let chunk_crypt_mode = match chunk.crypt_mode() {
> Err(err) => {
> - corrupt_chunks2.lock().unwrap().insert(digest);
> + corrupt_chunks.lock().unwrap().insert(digest);
> info!("can't verify chunk, unknown CryptMode - {err}");
> - errors2.fetch_add(1, Ordering::SeqCst);
> + errors.fetch_add(1, Ordering::SeqCst);
> return Ok(());
> }
> Ok(mode) => mode,
> @@ -182,21 +179,21 @@ impl VerifyWorker {
> info!(
> "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
> );
> - errors2.fetch_add(1, Ordering::SeqCst);
> + errors.fetch_add(1, Ordering::SeqCst);
> }
>
> if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> - corrupt_chunks2.lock().unwrap().insert(digest);
> + corrupt_chunks.lock().unwrap().insert(digest);
> info!("{err}");
> - errors2.fetch_add(1, Ordering::SeqCst);
> - Self::rename_corrupted_chunk(datastore2.clone(), &digest);
> + errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(datastore.clone(), &digest);
> } else {
> - verified_chunks2.lock().unwrap().insert(digest);
> + verified_chunks.lock().unwrap().insert(digest);
> }
>
> Ok(())
> - },
> - );
> + }
> + });
>
> let skip_chunk = |digest: &[u8; 32]| -> bool {
> if self.verified_chunks.lock().unwrap().contains(digest) {
> @@ -223,6 +220,29 @@ impl VerifyWorker {
> .datastore
> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>
> + let reader_pool = ParallelHandler::new("read chunks", 4, {
> + let decoder_pool = decoder_pool.channel();
> + let datastore = Arc::clone(&self.datastore);
> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> + let read_bytes = Arc::clone(&read_bytes);
> + let decoded_bytes = Arc::clone(&decoded_bytes);
> + let errors = Arc::clone(&errors);
> + let backend = self.backend.clone();
> +
> + move |info: ChunkReadInfo| {
> + Self::verify_chunk_by_backend(
> + &backend,
> + Arc::clone(&datastore),
> + Arc::clone(&corrupt_chunks),
> + Arc::clone(&read_bytes),
> + Arc::clone(&decoded_bytes),
> + Arc::clone(&errors),
> + &decoder_pool,
> + &info,
> + )
> + }
> + });
> +
> for (pos, _) in chunk_list {
> self.worker.check_abort()?;
> self.worker.fail_on_shutdown()?;
> @@ -234,19 +254,16 @@ impl VerifyWorker {
> continue; // already verified or marked corrupt
> }
>
> - self.verify_chunk_by_backend(
> - &info,
> - &mut read_bytes,
> - &mut decoded_bytes,
> - Arc::clone(&errors),
> - &decoder_pool,
> - )?;
> + reader_pool.send(info)?;
> }
>
> - decoder_pool.complete()?;
> + reader_pool.complete()?;
>
> let elapsed = start_time.elapsed().as_secs_f64();
>
> + let read_bytes = read_bytes.load(Ordering::SeqCst);
> + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> +
> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
>
> @@ -266,26 +283,31 @@ impl VerifyWorker {
> Ok(())
> }
>
> + #[allow(clippy::too_many_arguments)]
> fn verify_chunk_by_backend(
> - &self,
> - info: &ChunkReadInfo,
> - read_bytes: &mut u64,
> - decoded_bytes: &mut u64,
> + backend: &DatastoreBackend,
> + datastore: Arc<DataStore>,
> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> + read_bytes: Arc<AtomicU64>,
> + decoded_bytes: Arc<AtomicU64>,
> errors: Arc<AtomicUsize>,
> - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
> + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
> + info: &ChunkReadInfo,
> ) -> Result<(), Error> {
> - match &self.backend {
> - DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
> - Err(err) => self.add_corrupt_chunk(
> + match backend {
> + DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> + Err(err) => Self::add_corrupt_chunk(
> + datastore,
> + corrupt_chunks,
> info.digest,
> errors,
> &format!("can't verify chunk, load failed - {err}"),
> ),
> Ok(chunk) => {
> let size = info.size();
> - *read_bytes += chunk.raw_size();
> + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> decoder_pool.send((chunk, info.digest, size))?;
> - *decoded_bytes += size;
> + decoded_bytes.fetch_add(size, Ordering::SeqCst);
> }
> },
> DatastoreBackend::S3(s3_client) => {
> @@ -302,9 +324,9 @@ impl VerifyWorker {
> match chunk_result {
> Ok(chunk) => {
> let size = info.size();
> - *read_bytes += chunk.raw_size();
> + read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> decoder_pool.send((chunk, info.digest, size))?;
> - *decoded_bytes += size;
> + decoded_bytes.fetch_add(size, Ordering::SeqCst);
> }
> Err(err) => {
> errors.fetch_add(1, Ordering::SeqCst);
> @@ -312,7 +334,9 @@ impl VerifyWorker {
> }
> }
> }
> - Ok(None) => self.add_corrupt_chunk(
> + Ok(None) => Self::add_corrupt_chunk(
> + datastore,
> + corrupt_chunks,
> info.digest,
> errors,
> &format!(
> @@ -330,13 +354,19 @@ impl VerifyWorker {
> Ok(())
> }
>
> - fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
> + fn add_corrupt_chunk(
> + datastore: Arc<DataStore>,
> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> + digest: [u8; 32],
> + errors: Arc<AtomicUsize>,
> + message: &str,
> + ) {
> // Panic on poisoned mutex
> - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
> + let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
> corrupt_chunks.insert(digest);
> error!(message);
> errors.fetch_add(1, Ordering::SeqCst);
> - Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
> + Self::rename_corrupted_chunk(datastore.clone(), &digest);
> }
>
> fn verify_fixed_index(&self, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 7f91f38c9..e01405750 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -152,24 +152,24 @@ impl VerifyWorker {
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
- let errors = Arc::new(AtomicUsize::new(0));
-
- let start_time = Instant::now();
-
- let read_bytes = Arc::new(AtomicU64::new(0));
- let decoded_bytes = Arc::new(AtomicU64::new(0));
+ let index_verify_state = Arc::new(IndexVerifyState {
+ read_bytes: AtomicU64::new(0),
+ decoded_bytes: AtomicU64::new(0),
+ errors: AtomicUsize::new(0),
+ datastore: Arc::clone(&self.datastore),
+ corrupt_chunks: Arc::clone(&self.corrupt_chunks),
+ start_time: Instant::now(),
+ });
let decoder_pool = ParallelHandler::new("verify chunk
decoder", 4, {
- let datastore = Arc::clone(&self.datastore);
- let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
+ let state = Arc::clone(&index_verify_state);
let verified_chunks = Arc::clone(&self.verified_chunks);
- let errors = Arc::clone(&errors);
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
- corrupt_chunks.lock().unwrap().insert(digest);
+
state.corrupt_chunks.lock().unwrap().insert(digest);
info!("can't verify chunk, unknown CryptMode -
{err}");
- errors.fetch_add(1, Ordering::SeqCst);
+ state.errors.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Ok(mode) => mode,
@@ -179,14 +179,14 @@ impl VerifyWorker {
info!(
"chunk CryptMode {chunk_crypt_mode:?} does not
match index CryptMode {crypt_mode:?}"
);
- errors.fetch_add(1, Ordering::SeqCst);
+ state.errors.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as
usize, &digest) {
- corrupt_chunks.lock().unwrap().insert(digest);
+ state.corrupt_chunks.lock().unwrap().insert(digest);
info!("{err}");
- errors.fetch_add(1, Ordering::SeqCst);
- Self::rename_corrupted_chunk(datastore.clone(),
&digest);
+ state.errors.fetch_add(1, Ordering::SeqCst);
+
Self::rename_corrupted_chunk(state.datastore.clone(), &digest);
} else {
verified_chunks.lock().unwrap().insert(digest);
}
@@ -201,7 +201,7 @@ impl VerifyWorker {
} else if
self.corrupt_chunks.lock().unwrap().contains(digest) {
let digest_str = hex::encode(digest);
info!("chunk {digest_str} was marked as corrupt");
- errors.fetch_add(1, Ordering::SeqCst);
+ index_verify_state.errors.fetch_add(1, Ordering::SeqCst);
true
} else {
false
@@ -222,24 +222,11 @@ impl VerifyWorker {
let reader_pool = ParallelHandler::new("read chunks", 4, {
let decoder_pool = decoder_pool.channel();
- let datastore = Arc::clone(&self.datastore);
- let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
- let read_bytes = Arc::clone(&read_bytes);
- let decoded_bytes = Arc::clone(&decoded_bytes);
- let errors = Arc::clone(&errors);
+ let state = Arc::clone(&index_verify_state);
let backend = self.backend.clone();
move |info: ChunkReadInfo| {
- Self::verify_chunk_by_backend(
- &backend,
- Arc::clone(&datastore),
- Arc::clone(&corrupt_chunks),
- Arc::clone(&read_bytes),
- Arc::clone(&decoded_bytes),
- Arc::clone(&errors),
- &decoder_pool,
- &info,
- )
+ Self::verify_chunk_by_backend(&backend,
Arc::clone(&state), &decoder_pool, &info)
}
});
@@ -259,10 +246,10 @@ impl VerifyWorker {
reader_pool.complete()?;
- let elapsed = start_time.elapsed().as_secs_f64();
+ let elapsed =
index_verify_state.start_time.elapsed().as_secs_f64();
- let read_bytes = read_bytes.load(Ordering::SeqCst);
- let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+ let read_bytes =
index_verify_state.read_bytes.load(Ordering::SeqCst);
+ let decoded_bytes =
index_verify_state.decoded_bytes.load(Ordering::SeqCst);
let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 *
1024.0);
@@ -270,13 +257,13 @@ impl VerifyWorker {
let read_speed = read_bytes_mib / elapsed;
let decode_speed = decoded_bytes_mib / elapsed;
- let error_count = errors.load(Ordering::SeqCst);
+ let error_count = index_verify_state.errors.load(Ordering::SeqCst);
info!(
" verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB
in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s
({error_count} errors)"
);
- if errors.load(Ordering::SeqCst) > 0 {
+ if index_verify_state.errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified");
}
@@ -286,28 +273,26 @@ impl VerifyWorker {
#[allow(clippy::too_many_arguments)]
fn verify_chunk_by_backend(
backend: &DatastoreBackend,
- datastore: Arc<DataStore>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
- read_bytes: Arc<AtomicU64>,
- decoded_bytes: Arc<AtomicU64>,
- errors: Arc<AtomicUsize>,
+ state: Arc<IndexVerifyState>,
decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
info: &ChunkReadInfo,
) -> Result<(), Error> {
match backend {
- DatastoreBackend::Filesystem => match
datastore.load_chunk(&info.digest) {
+ DatastoreBackend::Filesystem => match
state.datastore.load_chunk(&info.digest) {
Err(err) => Self::add_corrupt_chunk(
- datastore,
- corrupt_chunks,
+ Arc::clone(&state.datastore),
+ Arc::clone(&state.corrupt_chunks),
info.digest,
- errors,
+ &state.errors,
&format!("can't verify chunk, load failed - {err}"),
),
Ok(chunk) => {
let size = info.size();
- read_bytes.fetch_add(chunk.raw_size(),
Ordering::SeqCst);
+ state
+ .read_bytes
+ .fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- decoded_bytes.fetch_add(size, Ordering::SeqCst);
+ state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
},
DatastoreBackend::S3(s3_client) => {
@@ -324,28 +309,30 @@ impl VerifyWorker {
match chunk_result {
Ok(chunk) => {
let size = info.size();
- read_bytes.fetch_add(chunk.raw_size(),
Ordering::SeqCst);
+ state
+ .read_bytes
+ .fetch_add(chunk.raw_size(),
Ordering::SeqCst);
decoder_pool.send((chunk, info.digest,
size))?;
- decoded_bytes.fetch_add(size,
Ordering::SeqCst);
+ state.decoded_bytes.fetch_add(size,
Ordering::SeqCst);
}
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ state.errors.fetch_add(1,
Ordering::SeqCst);
error!("can't verify chunk, load
failed - {err}");
}
}
}
Ok(None) => Self::add_corrupt_chunk(
- datastore,
- corrupt_chunks,
+ Arc::clone(&state.datastore),
+ Arc::clone(&state.corrupt_chunks),
info.digest,
- errors,
+ &state.errors,
&format!(
"can't verify missing chunk with digest {}",
hex::encode(info.digest)
),
),
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ state.errors.fetch_add(1, Ordering::SeqCst);
error!("can't verify chunk, load failed - {err}");
}
}
@@ -358,7 +345,7 @@ impl VerifyWorker {
datastore: Arc<DataStore>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
digest: [u8; 32],
- errors: Arc<AtomicUsize>,
+ errors: &AtomicUsize,
message: &str,
) {
// Panic on poisoned mutex
@@ -681,3 +668,12 @@ impl VerifyWorker {
}
}
}
+
+struct IndexVerifyState {
+ read_bytes: AtomicU64,
+ decoded_bytes: AtomicU64,
+ errors: AtomicUsize,
+ datastore: Arc<DataStore>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ start_time: Instant,
+}
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler
2025-11-06 8:54 ` Christian Ebner
@ 2025-11-06 9:04 ` Nicolas Frey
2025-11-06 9:26 ` Christian Ebner
0 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-06 9:04 UTC (permalink / raw)
To: Christian Ebner, Proxmox Backup Server development discussion
On 11/6/25 9:54 AM, Christian Ebner wrote:
> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>> This way, the chunks will be loaded in parallel in addition to being
>> checked in parallel.
>>
>> Depending on the underlying storage, this can speed up reading chunks
>> from disk, especially when the underlying storage is IO depth
>> dependent, and the CPU is faster than the storage.
>>
>> Originally-by: Dominik Csapak <d.csapak@proxmox.com>
>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>> ---
>> src/backup/verify.rs | 120 ++++++++++++++++++++++++++
>> +----------------
>> 1 file changed, 75 insertions(+), 45 deletions(-)
>>
>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>> index e33fdf50..7f91f38c 100644
>> --- a/src/backup/verify.rs
>> +++ b/src/backup/verify.rs
>> @@ -1,6 +1,6 @@
>> use pbs_config::BackupLockGuard;
>> use std::collections::HashSet;
>> -use std::sync::atomic::{AtomicUsize, Ordering};
>> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
>> use std::sync::{Arc, Mutex};
>> use std::time::Instant;
>> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo,
>> IndexFile};
>> use pbs_datastore::manifest::{BackupManifest, FileInfo};
>> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend,
>> StoreProgress};
>> -use crate::tools::parallel_handler::ParallelHandler;
>> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
>> use crate::backup::hierarchy::ListAccessibleBackupGroups;
>> @@ -156,23 +156,20 @@ impl VerifyWorker {
>> let start_time = Instant::now();
>> - let mut read_bytes = 0;
>> - let mut decoded_bytes = 0;
>> + let read_bytes = Arc::new(AtomicU64::new(0));
>> + let decoded_bytes = Arc::new(AtomicU64::new(0));
>> - let datastore2 = Arc::clone(&self.datastore);
>> - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
>> - let verified_chunks2 = Arc::clone(&self.verified_chunks);
>> - let errors2 = Arc::clone(&errors);
>> -
>> - let decoder_pool = ParallelHandler::new(
>> - "verify chunk decoder",
>> - 4,
>> + let decoder_pool = ParallelHandler::new("verify chunk
>> decoder", 4, {
>> + let datastore = Arc::clone(&self.datastore);
>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>> + let verified_chunks = Arc::clone(&self.verified_chunks);
>> + let errors = Arc::clone(&errors);
>
> Hmm, this really warrants an internal state struct for bundling theses
> and only cloning the arc around that.
>
> E.g. I would propose to introduce a struct along the lines of:
>
> struct IndexVerifyState {
> read_bytes: AtomicU64,
> decoded_bytes: AtomicU64,
> errors: AtomicUsize,
> datastore: Arc<DataStore>,
> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> start_time: Instant,
> }
>
> Which can then be wrapped in an Arc and also passed along as parameter
> to the inner methods where required.
>
> See the diff below (untested), which could be used for further
> refining this.
>
I thought this could be improved in the initial patch, but wasn't
quite sure how I should approach it, so thanks for the Feedback!
The diff below looks good to me, I'll send a v2 for this in addition
to the things you pointed out in the other patches.
>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>> let chunk_crypt_mode = match chunk.crypt_mode() {
>> Err(err) => {
>> -
>> corrupt_chunks2.lock().unwrap().insert(digest);
>> + corrupt_chunks.lock().unwrap().insert(digest);
>> info!("can't verify chunk, unknown
>> CryptMode - {err}");
>> - errors2.fetch_add(1, Ordering::SeqCst);
>> + errors.fetch_add(1, Ordering::SeqCst);
>> return Ok(());
>> }
>> Ok(mode) => mode,
>> @@ -182,21 +179,21 @@ impl VerifyWorker {
>> info!(
>> "chunk CryptMode {chunk_crypt_mode:?} does not
>> match index CryptMode {crypt_mode:?}"
>> );
>> - errors2.fetch_add(1, Ordering::SeqCst);
>> + errors.fetch_add(1, Ordering::SeqCst);
>> }
>> if let Err(err) = chunk.verify_unencrypted(size
>> as usize, &digest) {
>> - corrupt_chunks2.lock().unwrap().insert(digest);
>> + corrupt_chunks.lock().unwrap().insert(digest);
>> info!("{err}");
>> - errors2.fetch_add(1, Ordering::SeqCst);
>> -
>> Self::rename_corrupted_chunk(datastore2.clone(), &digest);
>> + errors.fetch_add(1, Ordering::SeqCst);
>> + Self::rename_corrupted_chunk(datastore.clone(),
>> &digest);
>> } else {
>> - verified_chunks2.lock().unwrap().insert(digest);
>> + verified_chunks.lock().unwrap().insert(digest);
>> }
>> Ok(())
>> - },
>> - );
>> + }
>> + });
>> let skip_chunk = |digest: &[u8; 32]| -> bool {
>> if
>> self.verified_chunks.lock().unwrap().contains(digest) {
>> @@ -223,6 +220,29 @@ impl VerifyWorker {
>> .datastore
>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>> + let reader_pool = ParallelHandler::new("read chunks", 4, {
>> + let decoder_pool = decoder_pool.channel();
>> + let datastore = Arc::clone(&self.datastore);
>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>> + let read_bytes = Arc::clone(&read_bytes);
>> + let decoded_bytes = Arc::clone(&decoded_bytes);
>> + let errors = Arc::clone(&errors);
>> + let backend = self.backend.clone();
>> +
>> + move |info: ChunkReadInfo| {
>> + Self::verify_chunk_by_backend(
>> + &backend,
>> + Arc::clone(&datastore),
>> + Arc::clone(&corrupt_chunks),
>> + Arc::clone(&read_bytes),
>> + Arc::clone(&decoded_bytes),
>> + Arc::clone(&errors),
>> + &decoder_pool,
>> + &info,
>> + )
>> + }
>> + });
>> +
>> for (pos, _) in chunk_list {
>> self.worker.check_abort()?;
>> self.worker.fail_on_shutdown()?;
>> @@ -234,19 +254,16 @@ impl VerifyWorker {
>> continue; // already verified or marked corrupt
>> }
>> - self.verify_chunk_by_backend(
>> - &info,
>> - &mut read_bytes,
>> - &mut decoded_bytes,
>> - Arc::clone(&errors),
>> - &decoder_pool,
>> - )?;
>> + reader_pool.send(info)?;
>> }
>> - decoder_pool.complete()?;
>> + reader_pool.complete()?;
>> let elapsed = start_time.elapsed().as_secs_f64();
>> + let read_bytes = read_bytes.load(Ordering::SeqCst);
>> + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
>> +
>> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 *
>> 1024.0);
>> @@ -266,26 +283,31 @@ impl VerifyWorker {
>> Ok(())
>> }
>> + #[allow(clippy::too_many_arguments)]
>> fn verify_chunk_by_backend(
>> - &self,
>> - info: &ChunkReadInfo,
>> - read_bytes: &mut u64,
>> - decoded_bytes: &mut u64,
>> + backend: &DatastoreBackend,
>> + datastore: Arc<DataStore>,
>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> + read_bytes: Arc<AtomicU64>,
>> + decoded_bytes: Arc<AtomicU64>,
>> errors: Arc<AtomicUsize>,
>> - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
>> + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
>> + info: &ChunkReadInfo,
>> ) -> Result<(), Error> {
>> - match &self.backend {
>> - DatastoreBackend::Filesystem => match
>> self.datastore.load_chunk(&info.digest) {
>> - Err(err) => self.add_corrupt_chunk(
>> + match backend {
>> + DatastoreBackend::Filesystem => match
>> datastore.load_chunk(&info.digest) {
>> + Err(err) => Self::add_corrupt_chunk(
>> + datastore,
>> + corrupt_chunks,
>> info.digest,
>> errors,
>> &format!("can't verify chunk, load failed -
>> {err}"),
>> ),
>> Ok(chunk) => {
>> let size = info.size();
>> - *read_bytes += chunk.raw_size();
>> + read_bytes.fetch_add(chunk.raw_size(),
>> Ordering::SeqCst);
>> decoder_pool.send((chunk, info.digest, size))?;
>> - *decoded_bytes += size;
>> + decoded_bytes.fetch_add(size, Ordering::SeqCst);
>> }
>> },
>> DatastoreBackend::S3(s3_client) => {
>> @@ -302,9 +324,9 @@ impl VerifyWorker {
>> match chunk_result {
>> Ok(chunk) => {
>> let size = info.size();
>> - *read_bytes += chunk.raw_size();
>> +
>> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>> decoder_pool.send((chunk,
>> info.digest, size))?;
>> - *decoded_bytes += size;
>> + decoded_bytes.fetch_add(size,
>> Ordering::SeqCst);
>> }
>> Err(err) => {
>> errors.fetch_add(1,
>> Ordering::SeqCst);
>> @@ -312,7 +334,9 @@ impl VerifyWorker {
>> }
>> }
>> }
>> - Ok(None) => self.add_corrupt_chunk(
>> + Ok(None) => Self::add_corrupt_chunk(
>> + datastore,
>> + corrupt_chunks,
>> info.digest,
>> errors,
>> &format!(
>> @@ -330,13 +354,19 @@ impl VerifyWorker {
>> Ok(())
>> }
>> - fn add_corrupt_chunk(&self, digest: [u8; 32], errors:
>> Arc<AtomicUsize>, message: &str) {
>> + fn add_corrupt_chunk(
>> + datastore: Arc<DataStore>,
>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> + digest: [u8; 32],
>> + errors: Arc<AtomicUsize>,
>> + message: &str,
>> + ) {
>> // Panic on poisoned mutex
>> - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
>> + let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
>> corrupt_chunks.insert(digest);
>> error!(message);
>> errors.fetch_add(1, Ordering::SeqCst);
>> - Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
>> + Self::rename_corrupted_chunk(datastore.clone(), &digest);
>> }
>> fn verify_fixed_index(&self, backup_dir: &BackupDir, info:
>> &FileInfo) -> Result<(), Error> {
>
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 7f91f38c9..e01405750 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -152,24 +152,24 @@ impl VerifyWorker {
> index: Box<dyn IndexFile + Send>,
> crypt_mode: CryptMode,
> ) -> Result<(), Error> {
> - let errors = Arc::new(AtomicUsize::new(0));
> -
> - let start_time = Instant::now();
> -
> - let read_bytes = Arc::new(AtomicU64::new(0));
> - let decoded_bytes = Arc::new(AtomicU64::new(0));
> + let index_verify_state = Arc::new(IndexVerifyState {
> + read_bytes: AtomicU64::new(0),
> + decoded_bytes: AtomicU64::new(0),
> + errors: AtomicUsize::new(0),
> + datastore: Arc::clone(&self.datastore),
> + corrupt_chunks: Arc::clone(&self.corrupt_chunks),
> + start_time: Instant::now(),
> + });
>
> let decoder_pool = ParallelHandler::new("verify chunk
> decoder", 4, {
> - let datastore = Arc::clone(&self.datastore);
> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> + let state = Arc::clone(&index_verify_state);
> let verified_chunks = Arc::clone(&self.verified_chunks);
> - let errors = Arc::clone(&errors);
> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
> let chunk_crypt_mode = match chunk.crypt_mode() {
> Err(err) => {
> - corrupt_chunks.lock().unwrap().insert(digest);
> + state.corrupt_chunks.lock().unwrap().insert(digest);
> info!("can't verify chunk, unknown CryptMode
> - {err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> + state.errors.fetch_add(1, Ordering::SeqCst);
> return Ok(());
> }
> Ok(mode) => mode,
> @@ -179,14 +179,14 @@ impl VerifyWorker {
> info!(
> "chunk CryptMode {chunk_crypt_mode:?} does not
> match index CryptMode {crypt_mode:?}"
> );
> - errors.fetch_add(1, Ordering::SeqCst);
> + state.errors.fetch_add(1, Ordering::SeqCst);
> }
>
> if let Err(err) = chunk.verify_unencrypted(size as
> usize, &digest) {
> - corrupt_chunks.lock().unwrap().insert(digest);
> + state.corrupt_chunks.lock().unwrap().insert(digest);
> info!("{err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> - Self::rename_corrupted_chunk(datastore.clone(),
> &digest);
> + state.errors.fetch_add(1, Ordering::SeqCst);
> + Self::rename_corrupted_chunk(state.datastore.clone(), &digest);
> } else {
> verified_chunks.lock().unwrap().insert(digest);
> }
> @@ -201,7 +201,7 @@ impl VerifyWorker {
> } else if
> self.corrupt_chunks.lock().unwrap().contains(digest) {
> let digest_str = hex::encode(digest);
> info!("chunk {digest_str} was marked as corrupt");
> - errors.fetch_add(1, Ordering::SeqCst);
> + index_verify_state.errors.fetch_add(1,
> Ordering::SeqCst);
> true
> } else {
> false
> @@ -222,24 +222,11 @@ impl VerifyWorker {
>
> let reader_pool = ParallelHandler::new("read chunks", 4, {
> let decoder_pool = decoder_pool.channel();
> - let datastore = Arc::clone(&self.datastore);
> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> - let read_bytes = Arc::clone(&read_bytes);
> - let decoded_bytes = Arc::clone(&decoded_bytes);
> - let errors = Arc::clone(&errors);
> + let state = Arc::clone(&index_verify_state);
> let backend = self.backend.clone();
>
> move |info: ChunkReadInfo| {
> - Self::verify_chunk_by_backend(
> - &backend,
> - Arc::clone(&datastore),
> - Arc::clone(&corrupt_chunks),
> - Arc::clone(&read_bytes),
> - Arc::clone(&decoded_bytes),
> - Arc::clone(&errors),
> - &decoder_pool,
> - &info,
> - )
> + Self::verify_chunk_by_backend(&backend,
> Arc::clone(&state), &decoder_pool, &info)
> }
> });
>
> @@ -259,10 +246,10 @@ impl VerifyWorker {
>
> reader_pool.complete()?;
>
> - let elapsed = start_time.elapsed().as_secs_f64();
> + let elapsed =
> index_verify_state.start_time.elapsed().as_secs_f64();
>
> - let read_bytes = read_bytes.load(Ordering::SeqCst);
> - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> + let read_bytes =
> index_verify_state.read_bytes.load(Ordering::SeqCst);
> + let decoded_bytes =
> index_verify_state.decoded_bytes.load(Ordering::SeqCst);
>
> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 *
> 1024.0);
> @@ -270,13 +257,13 @@ impl VerifyWorker {
> let read_speed = read_bytes_mib / elapsed;
> let decode_speed = decoded_bytes_mib / elapsed;
>
> - let error_count = errors.load(Ordering::SeqCst);
> + let error_count =
> index_verify_state.errors.load(Ordering::SeqCst);
>
> info!(
> " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2}
> MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2}
> MiB/s ({error_count} errors)"
> );
>
> - if errors.load(Ordering::SeqCst) > 0 {
> + if index_verify_state.errors.load(Ordering::SeqCst) > 0 {
> bail!("chunks could not be verified");
> }
>
> @@ -286,28 +273,26 @@ impl VerifyWorker {
> #[allow(clippy::too_many_arguments)]
> fn verify_chunk_by_backend(
> backend: &DatastoreBackend,
> - datastore: Arc<DataStore>,
> - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> - read_bytes: Arc<AtomicU64>,
> - decoded_bytes: Arc<AtomicU64>,
> - errors: Arc<AtomicUsize>,
> + state: Arc<IndexVerifyState>,
> decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
> info: &ChunkReadInfo,
> ) -> Result<(), Error> {
> match backend {
> - DatastoreBackend::Filesystem => match
> datastore.load_chunk(&info.digest) {
> + DatastoreBackend::Filesystem => match
> state.datastore.load_chunk(&info.digest) {
> Err(err) => Self::add_corrupt_chunk(
> - datastore,
> - corrupt_chunks,
> + Arc::clone(&state.datastore),
> + Arc::clone(&state.corrupt_chunks),
> info.digest,
> - errors,
> + &state.errors,
> &format!("can't verify chunk, load failed - {err}"),
> ),
> Ok(chunk) => {
> let size = info.size();
> - read_bytes.fetch_add(chunk.raw_size(),
> Ordering::SeqCst);
> + state
> + .read_bytes
> + .fetch_add(chunk.raw_size(), Ordering::SeqCst);
> decoder_pool.send((chunk, info.digest, size))?;
> - decoded_bytes.fetch_add(size, Ordering::SeqCst);
> + state.decoded_bytes.fetch_add(size,
> Ordering::SeqCst);
> }
> },
> DatastoreBackend::S3(s3_client) => {
> @@ -324,28 +309,30 @@ impl VerifyWorker {
> match chunk_result {
> Ok(chunk) => {
> let size = info.size();
> -
> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> + state
> + .read_bytes
> + .fetch_add(chunk.raw_size(),
> Ordering::SeqCst);
> decoder_pool.send((chunk,
> info.digest, size))?;
> - decoded_bytes.fetch_add(size,
> Ordering::SeqCst);
> + state.decoded_bytes.fetch_add(size,
> Ordering::SeqCst);
> }
> Err(err) => {
> - errors.fetch_add(1, Ordering::SeqCst);
> + state.errors.fetch_add(1,
> Ordering::SeqCst);
> error!("can't verify chunk, load
> failed - {err}");
> }
> }
> }
> Ok(None) => Self::add_corrupt_chunk(
> - datastore,
> - corrupt_chunks,
> + Arc::clone(&state.datastore),
> + Arc::clone(&state.corrupt_chunks),
> info.digest,
> - errors,
> + &state.errors,
> &format!(
> "can't verify missing chunk with digest {}",
> hex::encode(info.digest)
> ),
> ),
> Err(err) => {
> - errors.fetch_add(1, Ordering::SeqCst);
> + state.errors.fetch_add(1, Ordering::SeqCst);
> error!("can't verify chunk, load failed -
> {err}");
> }
> }
> @@ -358,7 +345,7 @@ impl VerifyWorker {
> datastore: Arc<DataStore>,
> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> digest: [u8; 32],
> - errors: Arc<AtomicUsize>,
> + errors: &AtomicUsize,
> message: &str,
> ) {
> // Panic on poisoned mutex
> @@ -681,3 +668,12 @@ impl VerifyWorker {
> }
> }
> }
> +
> +struct IndexVerifyState {
> + read_bytes: AtomicU64,
> + decoded_bytes: AtomicU64,
> + errors: AtomicUsize,
> + datastore: Arc<DataStore>,
> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> + start_time: Instant,
> +}
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey
@ 2025-11-06 9:09 ` Christian Ebner
2025-11-06 9:23 ` Nicolas Frey
0 siblings, 1 reply; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 9:09 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Nicolas Frey
Please add a short commit message describing what the worker threads
cover, e.g. that this parameter controls the number of reader and chunk
verification threads.
What tripped me over just now:
Is this intentionally not increasing the number of chunk verification
threads? Or was that overlooked? From the name of the parameter I
suspected this to act on both, reading and verifying. If this is not the
case, maybe the parameter should get renamed to a more telling
`parallel-chunk-readers` instead?
further comment inline
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> src/api2/admin/datastore.rs | 13 +++++++++++--
> src/api2/backup/environment.rs | 2 +-
> src/backup/verify.rs | 5 ++++-
> src/server/verify_job.rs | 3 ++-
> 4 files changed, 18 insertions(+), 5 deletions(-)
>
> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
> index d192ee39..69a09081 100644
> --- a/src/api2/admin/datastore.rs
> +++ b/src/api2/admin/datastore.rs
> @@ -677,6 +677,14 @@ pub async fn status(
> schema: NS_MAX_DEPTH_SCHEMA,
> optional: true,
> },
> + "worker-threads": {
> + description: "Set the number of worker threads to use for the job",
> + type: Integer,
> + optional: true,
> + minimum: 1,
> + maximum: 32,
> + default: 1,
> + },
As mentioned on the pbs-api-types patch, this should reuse the same
schema as (will be) defined there, so this does not be to be re-defined
and stays in sync.
> },
> },
> returns: {
> @@ -690,7 +698,7 @@ pub async fn status(
> )]
> /// Verify backups.
> ///
> -/// This function can verify a single backup snapshot, all backup from a backup group,
> +/// This function can verify a single backup snapshot, all backups from a backup group,
> /// or all backups in the datastore.
> #[allow(clippy::too_many_arguments)]
> pub fn verify(
> @@ -702,6 +710,7 @@ pub fn verify(
> ignore_verified: Option<bool>,
> outdated_after: Option<i64>,
> max_depth: Option<usize>,
> + worker_threads: Option<usize>,
this could be a plain `usize` already, so it does not need to be
unwrapped for each parallel worker instantiation. The unwrapping and
setting to default can already happen in the constructor.
> rpcenv: &mut dyn RpcEnvironment,
> ) -> Result<Value, Error> {
> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
> @@ -781,7 +790,7 @@ pub fn verify(
> auth_id.to_string(),
> to_stdout,
> move |worker| {
> - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> + let verify_worker = VerifyWorker::new(worker.clone(), datastore, worker_threads)?;
> let failed_dirs = if let Some(backup_dir) = backup_dir {
> let mut res = Vec::new();
> if !verify_worker.verify_backup_dir(
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index 0e8eab1b..5e6a73b9 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -812,7 +812,7 @@ impl BackupEnvironment {
> move |worker| {
> worker.log_message("Automatically verifying newly added snapshot");
>
> - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> + let verify_worker = VerifyWorker::new(worker.clone(), datastore, None)?;
> if !verify_worker.verify_backup_dir_with_lock(
> &backup_dir,
> worker.upid().clone(),
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index 7f91f38c..e11dba8e 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> backend: DatastoreBackend,
> + worker_threads: Option<usize>,
... plain `usize` here
> }
>
> impl VerifyWorker {
> @@ -39,6 +40,7 @@ impl VerifyWorker {
> pub fn new(
> worker: Arc<dyn WorkerTaskContext>,
> datastore: Arc<DataStore>,
> + worker_threads: Option<usize>,
> ) -> Result<Self, Error> {
> let backend = datastore.backend()?;
> Ok(Self {
> @@ -49,6 +51,7 @@ impl VerifyWorker {
> // start with 64 chunks since we assume there are few corrupt ones
> corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
> backend,
> + worker_threads,
unwrap_or(4) here... or even define a constant for the default value,
although if it is placed here, it will only occur once.
> })
> }
>
> @@ -220,7 +223,7 @@ impl VerifyWorker {
> .datastore
> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>
> - let reader_pool = ParallelHandler::new("read chunks", 4, {
> + let reader_pool = ParallelHandler::new("read chunks", self.worker_threads.unwrap_or(4), {
> let decoder_pool = decoder_pool.channel();
> let datastore = Arc::clone(&self.datastore);
> let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
> index c8792174..9d790b07 100644
> --- a/src/server/verify_job.rs
> +++ b/src/server/verify_job.rs
> @@ -41,7 +41,8 @@ pub fn do_verification_job(
> None => Default::default(),
> };
>
> - let verify_worker = VerifyWorker::new(worker.clone(), datastore)?;
> + let verify_worker =
> + VerifyWorker::new(worker.clone(), datastore, verification_job.worker_threads)?;
> let result = verify_worker.verify_all_backups(
> worker.upid(),
> ns,
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey
@ 2025-11-06 9:13 ` Christian Ebner
0 siblings, 0 replies; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 9:13 UTC (permalink / raw)
To: pbs-devel
Again a short commit message would be nice, e.g.
Allows to add, update or delete the worker-thread property in the verify
job config via the API. This will control the number of workers to read
(and verify?) chunks in the verification job.
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> src/api2/config/verify.rs | 8 ++++++++
> 1 file changed, 8 insertions(+)
>
> diff --git a/src/api2/config/verify.rs b/src/api2/config/verify.rs
> index e71e0c2e..2847c984 100644
> --- a/src/api2/config/verify.rs
> +++ b/src/api2/config/verify.rs
> @@ -149,6 +149,8 @@ pub enum DeletableProperty {
> Ns,
> /// Delete max-depth property, defaulting to full recursion again
> MaxDepth,
> + /// Delete worker-threads property
> + WorkerThreads,
> }
>
> #[api(
> @@ -229,6 +231,9 @@ pub fn update_verification_job(
> DeletableProperty::MaxDepth => {
> data.max_depth = None;
> }
> + DeletableProperty::WorkerThreads => {
> + data.worker_threads = None;
> + }
> }
> }
> }
> @@ -266,6 +271,9 @@ pub fn update_verification_job(
> data.max_depth = Some(max_depth);
> }
> }
> + if update.worker_threads.is_some() {
> + data.worker_threads = update.worker_threads;
> + }
>
> // check new store and NS
> user_info.check_privs(&auth_id, &data.acl_path(), PRIV_DATASTORE_VERIFY, true)?;
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey
@ 2025-11-06 9:22 ` Christian Ebner
2025-11-06 9:25 ` Nicolas Frey
0 siblings, 1 reply; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 9:22 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Nicolas Frey
Same comment with respect to commit title as for some of the other
patches, a short description on why this is done would be nice. E.g.
Exposes the worker-threads property in the verification job config
window and verify all window so the user can update the values accordingly.
one comment below
On 11/5/25 4:51 PM, Nicolas Frey wrote:
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> www/window/VerifyAll.js | 12 ++++++++++++
> www/window/VerifyJobEdit.js | 13 +++++++++++++
> 2 files changed, 25 insertions(+)
>
> diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js
> index 01bcd63d..82f62aae 100644
> --- a/www/window/VerifyAll.js
> +++ b/www/window/VerifyAll.js
> @@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', {
> },
> ],
> },
> +
> + {
> + xtype: 'proxmoxintegerfield',
> + name: 'worker-threads',
> + fieldLabel: gettext('# of Worker Threads'),
> + emptyText: '1',
> + minValue: 1,
> + maxValue: 32,
> + cbind: {
> + deleteEmpty: '{!isCreate}',
> + },
> + },
> ],
> },
> ],
> diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js
> index e87ca069..7b7a96c4 100644
> --- a/www/window/VerifyJobEdit.js
> +++ b/www/window/VerifyJobEdit.js
> @@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', {
> },
> },
> ],
> + advancedColumn2: [
> + {
> + xtype: 'proxmoxintegerfield',
> + name: 'worker-threads',
> + fieldLabel: gettext('# of Worker Threads'),
> + emptyText: '1',
> + minValue: 1,
> + maxValue: 32,
> + cbind: {
> + deleteEmpty: '{!isCreate}',
> + },
> + },
> + ]
it would be nice to also delete the value again if set to the default.
Further, just noticed: this now seemingly defaults to 1, which is also
the default for the API schema, but the parallel task instantiation gets
a default of 4 if the value is not set and therefore unwrapped? That
should be made consistent.
> },
> });
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-06 9:09 ` Christian Ebner
@ 2025-11-06 9:23 ` Nicolas Frey
2025-11-06 9:32 ` Christian Ebner
0 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-06 9:23 UTC (permalink / raw)
To: Christian Ebner, Proxmox Backup Server development discussion
On 11/6/25 10:08 AM, Christian Ebner wrote:
> Please add a short commit message describing what the worker threads
> cover, e.g. that this parameter controls the number of reader and
> chunk verification threads.
>
> What tripped me over just now:
> Is this intentionally not increasing the number of chunk verification
> threads? Or was that overlooked? From the name of the parameter I
> suspected this to act on both, reading and verifying. If this is not
> the case, maybe the parameter should get renamed to a more telling
> `parallel-chunk-readers` instead?
I wasn't sure if the number of threads for verification should be
controlled via this as well, as the original patch only added a new
thread pool for reading, whereas the verification pool was already
implemented.
I pointed this out in the cover letter, though it might have been
better to put this here too:
The number of `worker-threads` only controls the thread pool for
reading, though if it makes sense to reuse this for the verification
pool, it could be adjusted to do so too.
I think it makes sense to use it to control the number of threads of
both. Thanks for the feedback, I'll adjust it along with the other
proposed changes in a v2!
>
> further comment inline
> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>> ---
>> src/api2/admin/datastore.rs | 13 +++++++++++--
>> src/api2/backup/environment.rs | 2 +-
>> src/backup/verify.rs | 5 ++++-
>> src/server/verify_job.rs | 3 ++-
>> 4 files changed, 18 insertions(+), 5 deletions(-)
>>
>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>> index d192ee39..69a09081 100644
>> --- a/src/api2/admin/datastore.rs
>> +++ b/src/api2/admin/datastore.rs
>> @@ -677,6 +677,14 @@ pub async fn status(
>> schema: NS_MAX_DEPTH_SCHEMA,
>> optional: true,
>> },
>> + "worker-threads": {
>> + description: "Set the number of worker threads to
>> use for the job",
>> + type: Integer,
>> + optional: true,
>> + minimum: 1,
>> + maximum: 32,
>> + default: 1,
>> + },
>
> As mentioned on the pbs-api-types patch, this should reuse the same
> schema as (will be) defined there, so this does not be to be re-
> defined and stays in sync.
>
>> },
>> },
>> returns: {
>> @@ -690,7 +698,7 @@ pub async fn status(
>> )]
>> /// Verify backups.
>> ///
>> -/// This function can verify a single backup snapshot, all backup
>> from a backup group,
>> +/// This function can verify a single backup snapshot, all backups
>> from a backup group,
>> /// or all backups in the datastore.
>> #[allow(clippy::too_many_arguments)]
>> pub fn verify(
>> @@ -702,6 +710,7 @@ pub fn verify(
>> ignore_verified: Option<bool>,
>> outdated_after: Option<i64>,
>> max_depth: Option<usize>,
>> + worker_threads: Option<usize>,
>
> this could be a plain `usize` already, so it does not need to be
> unwrapped for each parallel worker instantiation. The unwrapping and
> setting to default can already happen in the constructor.
>
>> rpcenv: &mut dyn RpcEnvironment,
>> ) -> Result<Value, Error> {
>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>> @@ -781,7 +790,7 @@ pub fn verify(
>> auth_id.to_string(),
>> to_stdout,
>> move |worker| {
>> - let verify_worker = VerifyWorker::new(worker.clone(),
>> datastore)?;
>> + let verify_worker = VerifyWorker::new(worker.clone(),
>> datastore, worker_threads)?;
>> let failed_dirs = if let Some(backup_dir) = backup_dir {
>> let mut res = Vec::new();
>> if !verify_worker.verify_backup_dir(
>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/
>> environment.rs
>> index 0e8eab1b..5e6a73b9 100644
>> --- a/src/api2/backup/environment.rs
>> +++ b/src/api2/backup/environment.rs
>> @@ -812,7 +812,7 @@ impl BackupEnvironment {
>> move |worker| {
>> worker.log_message("Automatically verifying newly
>> added snapshot");
>> - let verify_worker =
>> VerifyWorker::new(worker.clone(), datastore)?;
>> + let verify_worker =
>> VerifyWorker::new(worker.clone(), datastore, None)?;
>> if !verify_worker.verify_backup_dir_with_lock(
>> &backup_dir,
>> worker.upid().clone(),
>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>> index 7f91f38c..e11dba8e 100644
>> --- a/src/backup/verify.rs
>> +++ b/src/backup/verify.rs
>> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> backend: DatastoreBackend,
>> + worker_threads: Option<usize>,
>
> ... plain `usize` here
>
>> }
>> impl VerifyWorker {
>> @@ -39,6 +40,7 @@ impl VerifyWorker {
>> pub fn new(
>> worker: Arc<dyn WorkerTaskContext>,
>> datastore: Arc<DataStore>,
>> + worker_threads: Option<usize>,
>> ) -> Result<Self, Error> {
>> let backend = datastore.backend()?;
>> Ok(Self {
>> @@ -49,6 +51,7 @@ impl VerifyWorker {
>> // start with 64 chunks since we assume there are few
>> corrupt ones
>> corrupt_chunks:
>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>> backend,
>> + worker_threads,
>
> unwrap_or(4) here... or even define a constant for the default value,
> although if it is placed here, it will only occur once.
>
>> })
>> }
>> @@ -220,7 +223,7 @@ impl VerifyWorker {
>> .datastore
>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>> - let reader_pool = ParallelHandler::new("read chunks", 4, {
>> + let reader_pool = ParallelHandler::new("read chunks",
>> self.worker_threads.unwrap_or(4), {
>> let decoder_pool = decoder_pool.channel();
>> let datastore = Arc::clone(&self.datastore);
>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>> index c8792174..9d790b07 100644
>> --- a/src/server/verify_job.rs
>> +++ b/src/server/verify_job.rs
>> @@ -41,7 +41,8 @@ pub fn do_verification_job(
>> None => Default::default(),
>> };
>> - let verify_worker = VerifyWorker::new(worker.clone(),
>> datastore)?;
>> + let verify_worker =
>> + VerifyWorker::new(worker.clone(), datastore,
>> verification_job.worker_threads)?;
>> let result = verify_worker.verify_all_backups(
>> worker.upid(),
>> ns,
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job
2025-11-06 9:22 ` Christian Ebner
@ 2025-11-06 9:25 ` Nicolas Frey
0 siblings, 0 replies; 19+ messages in thread
From: Nicolas Frey @ 2025-11-06 9:25 UTC (permalink / raw)
To: Christian Ebner, Proxmox Backup Server development discussion
On 11/6/25 10:21 AM, Christian Ebner wrote:
> Same comment with respect to commit title as for some of the other
> patches, a short description on why this is done would be nice. E.g.
>
> Exposes the worker-threads property in the verification job config
> window and verify all window so the user can update the values
> accordingly.
>
> one comment below
>
> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>> ---
>> www/window/VerifyAll.js | 12 ++++++++++++
>> www/window/VerifyJobEdit.js | 13 +++++++++++++
>> 2 files changed, 25 insertions(+)
>>
>> diff --git a/www/window/VerifyAll.js b/www/window/VerifyAll.js
>> index 01bcd63d..82f62aae 100644
>> --- a/www/window/VerifyAll.js
>> +++ b/www/window/VerifyAll.js
>> @@ -80,6 +80,18 @@ Ext.define('PBS.window.VerifyAll', {
>> },
>> ],
>> },
>> +
>> + {
>> + xtype: 'proxmoxintegerfield',
>> + name: 'worker-threads',
>> + fieldLabel: gettext('# of Worker Threads'),
>> + emptyText: '1',
>> + minValue: 1,
>> + maxValue: 32,
>> + cbind: {
>> + deleteEmpty: '{!isCreate}',
>> + },
>> + },
>> ],
>> },
>> ],
>> diff --git a/www/window/VerifyJobEdit.js b/www/window/VerifyJobEdit.js
>> index e87ca069..7b7a96c4 100644
>> --- a/www/window/VerifyJobEdit.js
>> +++ b/www/window/VerifyJobEdit.js
>> @@ -166,5 +166,18 @@ Ext.define('PBS.window.VerifyJobEdit', {
>> },
>> },
>> ],
>> + advancedColumn2: [
>> + {
>> + xtype: 'proxmoxintegerfield',
>> + name: 'worker-threads',
>> + fieldLabel: gettext('# of Worker Threads'),
>> + emptyText: '1',
>> + minValue: 1,
>> + maxValue: 32,
>> + cbind: {
>> + deleteEmpty: '{!isCreate}',
>> + },
>> + },
>> + ]
>
> it would be nice to also delete the value again if set to the default.
>
> Further, just noticed: this now seemingly defaults to 1, which is also
> the default for the API schema, but the parallel task instantiation
> gets a default of 4 if the value is not set and therefore unwrapped?
> That should be made consistent.
>
Ah sorry, that was an oversight. This was a copy paste error, I reused
this code from the tape backup
>> },
>> });
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler
2025-11-06 9:04 ` Nicolas Frey
@ 2025-11-06 9:26 ` Christian Ebner
0 siblings, 0 replies; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 9:26 UTC (permalink / raw)
To: Nicolas Frey, Proxmox Backup Server development discussion
On 11/6/25 10:04 AM, Nicolas Frey wrote:
>
> On 11/6/25 9:54 AM, Christian Ebner wrote:
>> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>>> This way, the chunks will be loaded in parallel in addition to being
>>> checked in parallel.
>>>
>>> Depending on the underlying storage, this can speed up reading chunks
>>> from disk, especially when the underlying storage is IO depth
>>> dependent, and the CPU is faster than the storage.
>>>
>>> Originally-by: Dominik Csapak <d.csapak@proxmox.com>
>>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>>> ---
>>> src/backup/verify.rs | 120 ++++++++++++++++++++++++++
>>> +----------------
>>> 1 file changed, 75 insertions(+), 45 deletions(-)
>>>
>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>>> index e33fdf50..7f91f38c 100644
>>> --- a/src/backup/verify.rs
>>> +++ b/src/backup/verify.rs
>>> @@ -1,6 +1,6 @@
>>> use pbs_config::BackupLockGuard;
>>> use std::collections::HashSet;
>>> -use std::sync::atomic::{AtomicUsize, Ordering};
>>> +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
>>> use std::sync::{Arc, Mutex};
>>> use std::time::Instant;
>>> @@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo,
>>> IndexFile};
>>> use pbs_datastore::manifest::{BackupManifest, FileInfo};
>>> use pbs_datastore::{DataBlob, DataStore, DatastoreBackend,
>>> StoreProgress};
>>> -use crate::tools::parallel_handler::ParallelHandler;
>>> +use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
>>> use crate::backup::hierarchy::ListAccessibleBackupGroups;
>>> @@ -156,23 +156,20 @@ impl VerifyWorker {
>>> let start_time = Instant::now();
>>> - let mut read_bytes = 0;
>>> - let mut decoded_bytes = 0;
>>> + let read_bytes = Arc::new(AtomicU64::new(0));
>>> + let decoded_bytes = Arc::new(AtomicU64::new(0));
>>> - let datastore2 = Arc::clone(&self.datastore);
>>> - let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
>>> - let verified_chunks2 = Arc::clone(&self.verified_chunks);
>>> - let errors2 = Arc::clone(&errors);
>>> -
>>> - let decoder_pool = ParallelHandler::new(
>>> - "verify chunk decoder",
>>> - 4,
>>> + let decoder_pool = ParallelHandler::new("verify chunk
>>> decoder", 4, {
>>> + let datastore = Arc::clone(&self.datastore);
>>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>> + let verified_chunks = Arc::clone(&self.verified_chunks);
>>> + let errors = Arc::clone(&errors);
>>
>> Hmm, this really warrants an internal state struct for bundling theses
>> and only cloning the arc around that.
>>
>> E.g. I would propose to introduce a struct along the lines of:
>>
>> struct IndexVerifyState {
>> read_bytes: AtomicU64,
>> decoded_bytes: AtomicU64,
>> errors: AtomicUsize,
>> datastore: Arc<DataStore>,
>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> start_time: Instant,
>> }
>>
>> Which can then be wrapped in an Arc and also passed along as parameter
>> to the inner methods where required.
>>
>> See the diff below (untested), which could be used for further
>> refining this.
>>
>
> I thought this could be improved in the initial patch, but wasn't
> quite sure how I should approach it, so thanks for the Feedback!
> The diff below looks good to me, I'll send a v2 for this in addition
> to the things you pointed out in the other patches.
Great! But please note that the diff was just a rough suggestion in
which way to improve from my side, this can be further refined a bit.
E.g. we might passt the state also to the `add_corrupt_chunk()` which
would also benefit from a reduction in function parameters by this.>
>>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>>> let chunk_crypt_mode = match chunk.crypt_mode() {
>>> Err(err) => {
>>> -
>>> corrupt_chunks2.lock().unwrap().insert(digest);
>>> + corrupt_chunks.lock().unwrap().insert(digest);
>>> info!("can't verify chunk, unknown
>>> CryptMode - {err}");
>>> - errors2.fetch_add(1, Ordering::SeqCst);
>>> + errors.fetch_add(1, Ordering::SeqCst);
>>> return Ok(());
>>> }
>>> Ok(mode) => mode,
>>> @@ -182,21 +179,21 @@ impl VerifyWorker {
>>> info!(
>>> "chunk CryptMode {chunk_crypt_mode:?} does not
>>> match index CryptMode {crypt_mode:?}"
>>> );
>>> - errors2.fetch_add(1, Ordering::SeqCst);
>>> + errors.fetch_add(1, Ordering::SeqCst);
>>> }
>>> if let Err(err) = chunk.verify_unencrypted(size
>>> as usize, &digest) {
>>> - corrupt_chunks2.lock().unwrap().insert(digest);
>>> + corrupt_chunks.lock().unwrap().insert(digest);
>>> info!("{err}");
>>> - errors2.fetch_add(1, Ordering::SeqCst);
>>> -
>>> Self::rename_corrupted_chunk(datastore2.clone(), &digest);
>>> + errors.fetch_add(1, Ordering::SeqCst);
>>> + Self::rename_corrupted_chunk(datastore.clone(),
>>> &digest);
>>> } else {
>>> - verified_chunks2.lock().unwrap().insert(digest);
>>> + verified_chunks.lock().unwrap().insert(digest);
>>> }
>>> Ok(())
>>> - },
>>> - );
>>> + }
>>> + });
>>> let skip_chunk = |digest: &[u8; 32]| -> bool {
>>> if
>>> self.verified_chunks.lock().unwrap().contains(digest) {
>>> @@ -223,6 +220,29 @@ impl VerifyWorker {
>>> .datastore
>>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>>> + let reader_pool = ParallelHandler::new("read chunks", 4, {
>>> + let decoder_pool = decoder_pool.channel();
>>> + let datastore = Arc::clone(&self.datastore);
>>> + let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>> + let read_bytes = Arc::clone(&read_bytes);
>>> + let decoded_bytes = Arc::clone(&decoded_bytes);
>>> + let errors = Arc::clone(&errors);
>>> + let backend = self.backend.clone();
>>> +
>>> + move |info: ChunkReadInfo| {
>>> + Self::verify_chunk_by_backend(
>>> + &backend,
>>> + Arc::clone(&datastore),
>>> + Arc::clone(&corrupt_chunks),
>>> + Arc::clone(&read_bytes),
>>> + Arc::clone(&decoded_bytes),
>>> + Arc::clone(&errors),
>>> + &decoder_pool,
>>> + &info,
>>> + )
>>> + }
>>> + });
>>> +
>>> for (pos, _) in chunk_list {
>>> self.worker.check_abort()?;
>>> self.worker.fail_on_shutdown()?;
>>> @@ -234,19 +254,16 @@ impl VerifyWorker {
>>> continue; // already verified or marked corrupt
>>> }
>>> - self.verify_chunk_by_backend(
>>> - &info,
>>> - &mut read_bytes,
>>> - &mut decoded_bytes,
>>> - Arc::clone(&errors),
>>> - &decoder_pool,
>>> - )?;
>>> + reader_pool.send(info)?;
>>> }
>>> - decoder_pool.complete()?;
>>> + reader_pool.complete()?;
>>> let elapsed = start_time.elapsed().as_secs_f64();
>>> + let read_bytes = read_bytes.load(Ordering::SeqCst);
>>> + let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
>>> +
>>> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>>> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 *
>>> 1024.0);
>>> @@ -266,26 +283,31 @@ impl VerifyWorker {
>>> Ok(())
>>> }
>>> + #[allow(clippy::too_many_arguments)]
>>> fn verify_chunk_by_backend(
>>> - &self,
>>> - info: &ChunkReadInfo,
>>> - read_bytes: &mut u64,
>>> - decoded_bytes: &mut u64,
>>> + backend: &DatastoreBackend,
>>> + datastore: Arc<DataStore>,
>>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>> + read_bytes: Arc<AtomicU64>,
>>> + decoded_bytes: Arc<AtomicU64>,
>>> errors: Arc<AtomicUsize>,
>>> - decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
>>> + decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
>>> + info: &ChunkReadInfo,
>>> ) -> Result<(), Error> {
>>> - match &self.backend {
>>> - DatastoreBackend::Filesystem => match
>>> self.datastore.load_chunk(&info.digest) {
>>> - Err(err) => self.add_corrupt_chunk(
>>> + match backend {
>>> + DatastoreBackend::Filesystem => match
>>> datastore.load_chunk(&info.digest) {
>>> + Err(err) => Self::add_corrupt_chunk(
>>> + datastore,
>>> + corrupt_chunks,
>>> info.digest,
>>> errors,
>>> &format!("can't verify chunk, load failed -
>>> {err}"),
>>> ),
>>> Ok(chunk) => {
>>> let size = info.size();
>>> - *read_bytes += chunk.raw_size();
>>> + read_bytes.fetch_add(chunk.raw_size(),
>>> Ordering::SeqCst);
>>> decoder_pool.send((chunk, info.digest, size))?;
>>> - *decoded_bytes += size;
>>> + decoded_bytes.fetch_add(size, Ordering::SeqCst);
>>> }
>>> },
>>> DatastoreBackend::S3(s3_client) => {
>>> @@ -302,9 +324,9 @@ impl VerifyWorker {
>>> match chunk_result {
>>> Ok(chunk) => {
>>> let size = info.size();
>>> - *read_bytes += chunk.raw_size();
>>> +
>>> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>>> decoder_pool.send((chunk,
>>> info.digest, size))?;
>>> - *decoded_bytes += size;
>>> + decoded_bytes.fetch_add(size,
>>> Ordering::SeqCst);
>>> }
>>> Err(err) => {
>>> errors.fetch_add(1,
>>> Ordering::SeqCst);
>>> @@ -312,7 +334,9 @@ impl VerifyWorker {
>>> }
>>> }
>>> }
>>> - Ok(None) => self.add_corrupt_chunk(
>>> + Ok(None) => Self::add_corrupt_chunk(
>>> + datastore,
>>> + corrupt_chunks,
>>> info.digest,
>>> errors,
>>> &format!(
>>> @@ -330,13 +354,19 @@ impl VerifyWorker {
>>> Ok(())
>>> }
>>> - fn add_corrupt_chunk(&self, digest: [u8; 32], errors:
>>> Arc<AtomicUsize>, message: &str) {
>>> + fn add_corrupt_chunk(
>>> + datastore: Arc<DataStore>,
>>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>> + digest: [u8; 32],
>>> + errors: Arc<AtomicUsize>,
>>> + message: &str,
>>> + ) {
>>> // Panic on poisoned mutex
>>> - let mut corrupt_chunks = self.corrupt_chunks.lock().unwrap();
>>> + let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
>>> corrupt_chunks.insert(digest);
>>> error!(message);
>>> errors.fetch_add(1, Ordering::SeqCst);
>>> - Self::rename_corrupted_chunk(self.datastore.clone(), &digest);
>>> + Self::rename_corrupted_chunk(datastore.clone(), &digest);
>>> }
>>> fn verify_fixed_index(&self, backup_dir: &BackupDir, info:
>>> &FileInfo) -> Result<(), Error> {
>>
>>
>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>> index 7f91f38c9..e01405750 100644
>> --- a/src/backup/verify.rs
>> +++ b/src/backup/verify.rs
>> @@ -152,24 +152,24 @@ impl VerifyWorker {
>> index: Box<dyn IndexFile + Send>,
>> crypt_mode: CryptMode,
>> ) -> Result<(), Error> {
>> - let errors = Arc::new(AtomicUsize::new(0));
>> -
>> - let start_time = Instant::now();
>> -
>> - let read_bytes = Arc::new(AtomicU64::new(0));
>> - let decoded_bytes = Arc::new(AtomicU64::new(0));
>> + let index_verify_state = Arc::new(IndexVerifyState {
>> + read_bytes: AtomicU64::new(0),
>> + decoded_bytes: AtomicU64::new(0),
>> + errors: AtomicUsize::new(0),
>> + datastore: Arc::clone(&self.datastore),
>> + corrupt_chunks: Arc::clone(&self.corrupt_chunks),
>> + start_time: Instant::now(),
>> + });
>>
>> let decoder_pool = ParallelHandler::new("verify chunk
>> decoder", 4, {
>> - let datastore = Arc::clone(&self.datastore);
>> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>> + let state = Arc::clone(&index_verify_state);
>> let verified_chunks = Arc::clone(&self.verified_chunks);
>> - let errors = Arc::clone(&errors);
>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>> let chunk_crypt_mode = match chunk.crypt_mode() {
>> Err(err) => {
>> - corrupt_chunks.lock().unwrap().insert(digest);
>> + state.corrupt_chunks.lock().unwrap().insert(digest);
>> info!("can't verify chunk, unknown CryptMode
>> - {err}");
>> - errors.fetch_add(1, Ordering::SeqCst);
>> + state.errors.fetch_add(1, Ordering::SeqCst);
>> return Ok(());
>> }
>> Ok(mode) => mode,
>> @@ -179,14 +179,14 @@ impl VerifyWorker {
>> info!(
>> "chunk CryptMode {chunk_crypt_mode:?} does not
>> match index CryptMode {crypt_mode:?}"
>> );
>> - errors.fetch_add(1, Ordering::SeqCst);
>> + state.errors.fetch_add(1, Ordering::SeqCst);
>> }
>>
>> if let Err(err) = chunk.verify_unencrypted(size as
>> usize, &digest) {
>> - corrupt_chunks.lock().unwrap().insert(digest);
>> + state.corrupt_chunks.lock().unwrap().insert(digest);
>> info!("{err}");
>> - errors.fetch_add(1, Ordering::SeqCst);
>> - Self::rename_corrupted_chunk(datastore.clone(),
>> &digest);
>> + state.errors.fetch_add(1, Ordering::SeqCst);
>> + Self::rename_corrupted_chunk(state.datastore.clone(), &digest);
>> } else {
>> verified_chunks.lock().unwrap().insert(digest);
>> }
>> @@ -201,7 +201,7 @@ impl VerifyWorker {
>> } else if
>> self.corrupt_chunks.lock().unwrap().contains(digest) {
>> let digest_str = hex::encode(digest);
>> info!("chunk {digest_str} was marked as corrupt");
>> - errors.fetch_add(1, Ordering::SeqCst);
>> + index_verify_state.errors.fetch_add(1,
>> Ordering::SeqCst);
>> true
>> } else {
>> false
>> @@ -222,24 +222,11 @@ impl VerifyWorker {
>>
>> let reader_pool = ParallelHandler::new("read chunks", 4, {
>> let decoder_pool = decoder_pool.channel();
>> - let datastore = Arc::clone(&self.datastore);
>> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>> - let read_bytes = Arc::clone(&read_bytes);
>> - let decoded_bytes = Arc::clone(&decoded_bytes);
>> - let errors = Arc::clone(&errors);
>> + let state = Arc::clone(&index_verify_state);
>> let backend = self.backend.clone();
>>
>> move |info: ChunkReadInfo| {
>> - Self::verify_chunk_by_backend(
>> - &backend,
>> - Arc::clone(&datastore),
>> - Arc::clone(&corrupt_chunks),
>> - Arc::clone(&read_bytes),
>> - Arc::clone(&decoded_bytes),
>> - Arc::clone(&errors),
>> - &decoder_pool,
>> - &info,
>> - )
>> + Self::verify_chunk_by_backend(&backend,
>> Arc::clone(&state), &decoder_pool, &info)
>> }
>> });
>>
>> @@ -259,10 +246,10 @@ impl VerifyWorker {
>>
>> reader_pool.complete()?;
>>
>> - let elapsed = start_time.elapsed().as_secs_f64();
>> + let elapsed =
>> index_verify_state.start_time.elapsed().as_secs_f64();
>>
>> - let read_bytes = read_bytes.load(Ordering::SeqCst);
>> - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
>> + let read_bytes =
>> index_verify_state.read_bytes.load(Ordering::SeqCst);
>> + let decoded_bytes =
>> index_verify_state.decoded_bytes.load(Ordering::SeqCst);
>>
>> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
>> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 *
>> 1024.0);
>> @@ -270,13 +257,13 @@ impl VerifyWorker {
>> let read_speed = read_bytes_mib / elapsed;
>> let decode_speed = decoded_bytes_mib / elapsed;
>>
>> - let error_count = errors.load(Ordering::SeqCst);
>> + let error_count =
>> index_verify_state.errors.load(Ordering::SeqCst);
>>
>> info!(
>> " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2}
>> MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2}
>> MiB/s ({error_count} errors)"
>> );
>>
>> - if errors.load(Ordering::SeqCst) > 0 {
>> + if index_verify_state.errors.load(Ordering::SeqCst) > 0 {
>> bail!("chunks could not be verified");
>> }
>>
>> @@ -286,28 +273,26 @@ impl VerifyWorker {
>> #[allow(clippy::too_many_arguments)]
>> fn verify_chunk_by_backend(
>> backend: &DatastoreBackend,
>> - datastore: Arc<DataStore>,
>> - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> - read_bytes: Arc<AtomicU64>,
>> - decoded_bytes: Arc<AtomicU64>,
>> - errors: Arc<AtomicUsize>,
>> + state: Arc<IndexVerifyState>,
>> decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
>> info: &ChunkReadInfo,
>> ) -> Result<(), Error> {
>> match backend {
>> - DatastoreBackend::Filesystem => match
>> datastore.load_chunk(&info.digest) {
>> + DatastoreBackend::Filesystem => match
>> state.datastore.load_chunk(&info.digest) {
>> Err(err) => Self::add_corrupt_chunk(
>> - datastore,
>> - corrupt_chunks,
>> + Arc::clone(&state.datastore),
>> + Arc::clone(&state.corrupt_chunks),
>> info.digest,
>> - errors,
>> + &state.errors,
>> &format!("can't verify chunk, load failed - {err}"),
>> ),
>> Ok(chunk) => {
>> let size = info.size();
>> - read_bytes.fetch_add(chunk.raw_size(),
>> Ordering::SeqCst);
>> + state
>> + .read_bytes
>> + .fetch_add(chunk.raw_size(), Ordering::SeqCst);
>> decoder_pool.send((chunk, info.digest, size))?;
>> - decoded_bytes.fetch_add(size, Ordering::SeqCst);
>> + state.decoded_bytes.fetch_add(size,
>> Ordering::SeqCst);
>> }
>> },
>> DatastoreBackend::S3(s3_client) => {
>> @@ -324,28 +309,30 @@ impl VerifyWorker {
>> match chunk_result {
>> Ok(chunk) => {
>> let size = info.size();
>> -
>> read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
>> + state
>> + .read_bytes
>> + .fetch_add(chunk.raw_size(),
>> Ordering::SeqCst);
>> decoder_pool.send((chunk,
>> info.digest, size))?;
>> - decoded_bytes.fetch_add(size,
>> Ordering::SeqCst);
>> + state.decoded_bytes.fetch_add(size,
>> Ordering::SeqCst);
>> }
>> Err(err) => {
>> - errors.fetch_add(1, Ordering::SeqCst);
>> + state.errors.fetch_add(1,
>> Ordering::SeqCst);
>> error!("can't verify chunk, load
>> failed - {err}");
>> }
>> }
>> }
>> Ok(None) => Self::add_corrupt_chunk(
>> - datastore,
>> - corrupt_chunks,
>> + Arc::clone(&state.datastore),
>> + Arc::clone(&state.corrupt_chunks),
>> info.digest,
>> - errors,
>> + &state.errors,
>> &format!(
>> "can't verify missing chunk with digest {}",
>> hex::encode(info.digest)
>> ),
>> ),
>> Err(err) => {
>> - errors.fetch_add(1, Ordering::SeqCst);
>> + state.errors.fetch_add(1, Ordering::SeqCst);
>> error!("can't verify chunk, load failed -
>> {err}");
>> }
>> }
>> @@ -358,7 +345,7 @@ impl VerifyWorker {
>> datastore: Arc<DataStore>,
>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> digest: [u8; 32],
>> - errors: Arc<AtomicUsize>,
>> + errors: &AtomicUsize,
>> message: &str,
>> ) {
>> // Panic on poisoned mutex
>> @@ -681,3 +668,12 @@ impl VerifyWorker {
>> }
>> }
>> }
>> +
>> +struct IndexVerifyState {
>> + read_bytes: AtomicU64,
>> + decoded_bytes: AtomicU64,
>> + errors: AtomicUsize,
>> + datastore: Arc<DataStore>,
>> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>> + start_time: Instant,
>> +}
>>
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-06 9:23 ` Nicolas Frey
@ 2025-11-06 9:32 ` Christian Ebner
2025-11-06 11:22 ` Nicolas Frey
0 siblings, 1 reply; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 9:32 UTC (permalink / raw)
To: Nicolas Frey, Proxmox Backup Server development discussion
On 11/6/25 10:22 AM, Nicolas Frey wrote:
> On 11/6/25 10:08 AM, Christian Ebner wrote:
>> Please add a short commit message describing what the worker threads
>> cover, e.g. that this parameter controls the number of reader and
>> chunk verification threads.
>>
>> What tripped me over just now:
>> Is this intentionally not increasing the number of chunk verification
>> threads? Or was that overlooked? From the name of the parameter I
>> suspected this to act on both, reading and verifying. If this is not
>> the case, maybe the parameter should get renamed to a more telling
>> `parallel-chunk-readers` instead?
>
> I wasn't sure if the number of threads for verification should be
> controlled via this as well, as the original patch only added a new
> thread pool for reading, whereas the verification pool was already
> implemented.
> I pointed this out in the cover letter, though it might have been
> better to put this here too:
>
> The number of `worker-threads` only controls the thread pool for
> reading, though if it makes sense to reuse this for the verification
> pool, it could be adjusted to do so too.
>
> I think it makes sense to use it to control the number of threads of
> both. Thanks for the feedback, I'll adjust it along with the other
> proposed changes in a v2!
Well, that was just an uninformed assumption from my side when reading
the parameter name (and I did not re-read the cover letter today after
having looked at this quickly yesterday, sorry for that).
But maybe you can also evaluate if it actually makes sense to control
both by the same parameter, or if it only makes sense to e.g. increase
the number of verification tasks (no point for that if the IO remains
the bottleneck), or if it would make sense to have either 2 parameters
or couple them by some proportionality constant.
> >
>> further comment inline
>> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>>> ---
>>> src/api2/admin/datastore.rs | 13 +++++++++++--
>>> src/api2/backup/environment.rs | 2 +-
>>> src/backup/verify.rs | 5 ++++-
>>> src/server/verify_job.rs | 3 ++-
>>> 4 files changed, 18 insertions(+), 5 deletions(-)
>>>
>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
>>> index d192ee39..69a09081 100644
>>> --- a/src/api2/admin/datastore.rs
>>> +++ b/src/api2/admin/datastore.rs
>>> @@ -677,6 +677,14 @@ pub async fn status(
>>> schema: NS_MAX_DEPTH_SCHEMA,
>>> optional: true,
>>> },
>>> + "worker-threads": {
>>> + description: "Set the number of worker threads to
>>> use for the job",
>>> + type: Integer,
>>> + optional: true,
>>> + minimum: 1,
>>> + maximum: 32,
>>> + default: 1,
>>> + },
>>
>> As mentioned on the pbs-api-types patch, this should reuse the same
>> schema as (will be) defined there, so this does not be to be re-
>> defined and stays in sync.
>>
>>> },
>>> },
>>> returns: {
>>> @@ -690,7 +698,7 @@ pub async fn status(
>>> )]
>>> /// Verify backups.
>>> ///
>>> -/// This function can verify a single backup snapshot, all backup
>>> from a backup group,
>>> +/// This function can verify a single backup snapshot, all backups
>>> from a backup group,
>>> /// or all backups in the datastore.
>>> #[allow(clippy::too_many_arguments)]
>>> pub fn verify(
>>> @@ -702,6 +710,7 @@ pub fn verify(
>>> ignore_verified: Option<bool>,
>>> outdated_after: Option<i64>,
>>> max_depth: Option<usize>,
>>> + worker_threads: Option<usize>,
>>
>> this could be a plain `usize` already, so it does not need to be
>> unwrapped for each parallel worker instantiation. The unwrapping and
>> setting to default can already happen in the constructor.
>>
>>> rpcenv: &mut dyn RpcEnvironment,
>>> ) -> Result<Value, Error> {
>>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>> @@ -781,7 +790,7 @@ pub fn verify(
>>> auth_id.to_string(),
>>> to_stdout,
>>> move |worker| {
>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore)?;
>>> + let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore, worker_threads)?;
>>> let failed_dirs = if let Some(backup_dir) = backup_dir {
>>> let mut res = Vec::new();
>>> if !verify_worker.verify_backup_dir(
>>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/
>>> environment.rs
>>> index 0e8eab1b..5e6a73b9 100644
>>> --- a/src/api2/backup/environment.rs
>>> +++ b/src/api2/backup/environment.rs
>>> @@ -812,7 +812,7 @@ impl BackupEnvironment {
>>> move |worker| {
>>> worker.log_message("Automatically verifying newly
>>> added snapshot");
>>> - let verify_worker =
>>> VerifyWorker::new(worker.clone(), datastore)?;
>>> + let verify_worker =
>>> VerifyWorker::new(worker.clone(), datastore, None)?;
>>> if !verify_worker.verify_backup_dir_with_lock(
>>> &backup_dir,
>>> worker.upid().clone(),
>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>>> index 7f91f38c..e11dba8e 100644
>>> --- a/src/backup/verify.rs
>>> +++ b/src/backup/verify.rs
>>> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
>>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>> backend: DatastoreBackend,
>>> + worker_threads: Option<usize>,
>>
>> ... plain `usize` here
>>
>>> }
>>> impl VerifyWorker {
>>> @@ -39,6 +40,7 @@ impl VerifyWorker {
>>> pub fn new(
>>> worker: Arc<dyn WorkerTaskContext>,
>>> datastore: Arc<DataStore>,
>>> + worker_threads: Option<usize>,
>>> ) -> Result<Self, Error> {
>>> let backend = datastore.backend()?;
>>> Ok(Self {
>>> @@ -49,6 +51,7 @@ impl VerifyWorker {
>>> // start with 64 chunks since we assume there are few
>>> corrupt ones
>>> corrupt_chunks:
>>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>>> backend,
>>> + worker_threads,
>>
>> unwrap_or(4) here... or even define a constant for the default value,
>> although if it is placed here, it will only occur once.
>>
>>> })
>>> }
>>> @@ -220,7 +223,7 @@ impl VerifyWorker {
>>> .datastore
>>> .get_chunks_in_order(&*index, skip_chunk, check_abort)?;
>>> - let reader_pool = ParallelHandler::new("read chunks", 4, {
>>> + let reader_pool = ParallelHandler::new("read chunks",
>>> self.worker_threads.unwrap_or(4), {
>>> let decoder_pool = decoder_pool.channel();
>>> let datastore = Arc::clone(&self.datastore);
>>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>>> index c8792174..9d790b07 100644
>>> --- a/src/server/verify_job.rs
>>> +++ b/src/server/verify_job.rs
>>> @@ -41,7 +41,8 @@ pub fn do_verification_job(
>>> None => Default::default(),
>>> };
>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>> datastore)?;
>>> + let verify_worker =
>>> + VerifyWorker::new(worker.clone(), datastore,
>>> verification_job.worker_threads)?;
>>> let result = verify_worker.verify_all_backups(
>>> worker.upid(),
>>> ns,
>>
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-06 9:32 ` Christian Ebner
@ 2025-11-06 11:22 ` Nicolas Frey
2025-11-06 11:57 ` Christian Ebner
0 siblings, 1 reply; 19+ messages in thread
From: Nicolas Frey @ 2025-11-06 11:22 UTC (permalink / raw)
To: Christian Ebner, Proxmox Backup Server development discussion
On 11/6/25 10:32 AM, Christian Ebner wrote:
> On 11/6/25 10:22 AM, Nicolas Frey wrote:
>> On 11/6/25 10:08 AM, Christian Ebner wrote:
>>> Please add a short commit message describing what the worker threads
>>> cover, e.g. that this parameter controls the number of reader and
>>> chunk verification threads.
>>>
>>> What tripped me over just now:
>>> Is this intentionally not increasing the number of chunk verification
>>> threads? Or was that overlooked? From the name of the parameter I
>>> suspected this to act on both, reading and verifying. If this is not
>>> the case, maybe the parameter should get renamed to a more telling
>>> `parallel-chunk-readers` instead?
>>
>> I wasn't sure if the number of threads for verification should be
>> controlled via this as well, as the original patch only added a new
>> thread pool for reading, whereas the verification pool was already
>> implemented.
>> I pointed this out in the cover letter, though it might have been
>> better to put this here too:
>>
>> The number of `worker-threads` only controls the thread pool for
>> reading, though if it makes sense to reuse this for the verification
>> pool, it could be adjusted to do so too.
>>
>> I think it makes sense to use it to control the number of threads of
>> both. Thanks for the feedback, I'll adjust it along with the other
>> proposed changes in a v2!
>
> Well, that was just an uninformed assumption from my side when reading
> the parameter name (and I did not re-read the cover letter today after
> having looked at this quickly yesterday, sorry for that).
That makes sense, the parameter name does not accurately describe the
function it serves here anyway, so that should have been named a bit
better.
>
> But maybe you can also evaluate if it actually makes sense to control
> both by the same parameter, or if it only makes sense to e.g. increase
> the number of verification tasks (no point for that if the IO remains
> the bottleneck), or if it would make sense to have either 2 parameters
> or couple them by some proportionality constant.
>
I had an idea along the lines of:
self.worker_threads.mul(2).clamp(4, 32),
though the proportionality factor should be tested to determine what
would actually be sensible here and of course be documented accordingly.
I also thought a minimum of 4 threads for verification makes sense, as
when the default value of 1 thread is used, it has somewhat the same
behavior as before adding the read thread pool (i.e. 1 thread for
reading, 4 threads for verification) and would scale somewhat
accordingly. The threads should also clamped to a max of 32 to respect
the constraints of the schema also stating 32 as a max.
What do you think?
>> >
>>> further comment inline
>>> On 11/5/25 4:51 PM, Nicolas Frey wrote:
>>>> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
>>>> ---
>>>> src/api2/admin/datastore.rs | 13 +++++++++++--
>>>> src/api2/backup/environment.rs | 2 +-
>>>> src/backup/verify.rs | 5 ++++-
>>>> src/server/verify_job.rs | 3 ++-
>>>> 4 files changed, 18 insertions(+), 5 deletions(-)
>>>>
>>>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/
>>>> datastore.rs
>>>> index d192ee39..69a09081 100644
>>>> --- a/src/api2/admin/datastore.rs
>>>> +++ b/src/api2/admin/datastore.rs
>>>> @@ -677,6 +677,14 @@ pub async fn status(
>>>> schema: NS_MAX_DEPTH_SCHEMA,
>>>> optional: true,
>>>> },
>>>> + "worker-threads": {
>>>> + description: "Set the number of worker threads to
>>>> use for the job",
>>>> + type: Integer,
>>>> + optional: true,
>>>> + minimum: 1,
>>>> + maximum: 32,
>>>> + default: 1,
>>>> + },
>>>
>>> As mentioned on the pbs-api-types patch, this should reuse the same
>>> schema as (will be) defined there, so this does not be to be re-
>>> defined and stays in sync.
>>>
>>>> },
>>>> },
>>>> returns: {
>>>> @@ -690,7 +698,7 @@ pub async fn status(
>>>> )]
>>>> /// Verify backups.
>>>> ///
>>>> -/// This function can verify a single backup snapshot, all backup
>>>> from a backup group,
>>>> +/// This function can verify a single backup snapshot, all backups
>>>> from a backup group,
>>>> /// or all backups in the datastore.
>>>> #[allow(clippy::too_many_arguments)]
>>>> pub fn verify(
>>>> @@ -702,6 +710,7 @@ pub fn verify(
>>>> ignore_verified: Option<bool>,
>>>> outdated_after: Option<i64>,
>>>> max_depth: Option<usize>,
>>>> + worker_threads: Option<usize>,
>>>
>>> this could be a plain `usize` already, so it does not need to be
>>> unwrapped for each parallel worker instantiation. The unwrapping and
>>> setting to default can already happen in the constructor.
>>>
>>>> rpcenv: &mut dyn RpcEnvironment,
>>>> ) -> Result<Value, Error> {
>>>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
>>>> @@ -781,7 +790,7 @@ pub fn verify(
>>>> auth_id.to_string(),
>>>> to_stdout,
>>>> move |worker| {
>>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore)?;
>>>> + let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore, worker_threads)?;
>>>> let failed_dirs = if let Some(backup_dir) =
>>>> backup_dir {
>>>> let mut res = Vec::new();
>>>> if !verify_worker.verify_backup_dir(
>>>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/
>>>> environment.rs
>>>> index 0e8eab1b..5e6a73b9 100644
>>>> --- a/src/api2/backup/environment.rs
>>>> +++ b/src/api2/backup/environment.rs
>>>> @@ -812,7 +812,7 @@ impl BackupEnvironment {
>>>> move |worker| {
>>>> worker.log_message("Automatically verifying newly
>>>> added snapshot");
>>>> - let verify_worker =
>>>> VerifyWorker::new(worker.clone(), datastore)?;
>>>> + let verify_worker =
>>>> VerifyWorker::new(worker.clone(), datastore, None)?;
>>>> if !verify_worker.verify_backup_dir_with_lock(
>>>> &backup_dir,
>>>> worker.upid().clone(),
>>>> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
>>>> index 7f91f38c..e11dba8e 100644
>>>> --- a/src/backup/verify.rs
>>>> +++ b/src/backup/verify.rs
>>>> @@ -32,6 +32,7 @@ pub struct VerifyWorker {
>>>> verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>> corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>>>> backend: DatastoreBackend,
>>>> + worker_threads: Option<usize>,
>>>
>>> ... plain `usize` here
>>>
>>>> }
>>>> impl VerifyWorker {
>>>> @@ -39,6 +40,7 @@ impl VerifyWorker {
>>>> pub fn new(
>>>> worker: Arc<dyn WorkerTaskContext>,
>>>> datastore: Arc<DataStore>,
>>>> + worker_threads: Option<usize>,
>>>> ) -> Result<Self, Error> {
>>>> let backend = datastore.backend()?;
>>>> Ok(Self {
>>>> @@ -49,6 +51,7 @@ impl VerifyWorker {
>>>> // start with 64 chunks since we assume there are few
>>>> corrupt ones
>>>> corrupt_chunks:
>>>> Arc::new(Mutex::new(HashSet::with_capacity(64))),
>>>> backend,
>>>> + worker_threads,
>>>
>>> unwrap_or(4) here... or even define a constant for the default value,
>>> although if it is placed here, it will only occur once.
>>>
>>>> })
>>>> }
>>>> @@ -220,7 +223,7 @@ impl VerifyWorker {
>>>> .datastore
>>>> .get_chunks_in_order(&*index, skip_chunk,
>>>> check_abort)?;
>>>> - let reader_pool = ParallelHandler::new("read chunks",
>>>> 4, {
>>>> + let reader_pool = ParallelHandler::new("read chunks",
>>>> self.worker_threads.unwrap_or(4), {
>>>> let decoder_pool = decoder_pool.channel();
>>>> let datastore = Arc::clone(&self.datastore);
>>>> let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
>>>> diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
>>>> index c8792174..9d790b07 100644
>>>> --- a/src/server/verify_job.rs
>>>> +++ b/src/server/verify_job.rs
>>>> @@ -41,7 +41,8 @@ pub fn do_verification_job(
>>>> None => Default::default(),
>>>> };
>>>> - let verify_worker = VerifyWorker::new(worker.clone(),
>>>> datastore)?;
>>>> + let verify_worker =
>>>> + VerifyWorker::new(worker.clone(), datastore,
>>>> verification_job.worker_threads)?;
>>>> let result = verify_worker.verify_all_backups(
>>>> worker.upid(),
>>>> ns,
>>>
>>
>
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use
2025-11-06 11:22 ` Nicolas Frey
@ 2025-11-06 11:57 ` Christian Ebner
0 siblings, 0 replies; 19+ messages in thread
From: Christian Ebner @ 2025-11-06 11:57 UTC (permalink / raw)
To: Nicolas Frey, Proxmox Backup Server development discussion
On 11/6/25 12:21 PM, Nicolas Frey wrote:
> On 11/6/25 10:32 AM, Christian Ebner wrote:
>> On 11/6/25 10:22 AM, Nicolas Frey wrote:
>>> On 11/6/25 10:08 AM, Christian Ebner wrote:
>>>> Please add a short commit message describing what the worker threads
>>>> cover, e.g. that this parameter controls the number of reader and
>>>> chunk verification threads.
>>>>
>>>> What tripped me over just now:
>>>> Is this intentionally not increasing the number of chunk verification
>>>> threads? Or was that overlooked? From the name of the parameter I
>>>> suspected this to act on both, reading and verifying. If this is not
>>>> the case, maybe the parameter should get renamed to a more telling
>>>> `parallel-chunk-readers` instead?
>>>
>>> I wasn't sure if the number of threads for verification should be
>>> controlled via this as well, as the original patch only added a new
>>> thread pool for reading, whereas the verification pool was already
>>> implemented.
>>> I pointed this out in the cover letter, though it might have been
>>> better to put this here too:
>>>
>>> The number of `worker-threads` only controls the thread pool for
>>> reading, though if it makes sense to reuse this for the verification
>>> pool, it could be adjusted to do so too.
>>>
>>> I think it makes sense to use it to control the number of threads of
>>> both. Thanks for the feedback, I'll adjust it along with the other
>>> proposed changes in a v2!
>>
>> Well, that was just an uninformed assumption from my side when reading
>> the parameter name (and I did not re-read the cover letter today after
>> having looked at this quickly yesterday, sorry for that).
>
> That makes sense, the parameter name does not accurately describe the
> function it serves here anyway, so that should have been named a bit
> better.
>
>>
>> But maybe you can also evaluate if it actually makes sense to control
>> both by the same parameter, or if it only makes sense to e.g. increase
>> the number of verification tasks (no point for that if the IO remains
>> the bottleneck), or if it would make sense to have either 2 parameters
>> or couple them by some proportionality constant.
>>
>
> I had an idea along the lines of:
>
> self.worker_threads.mul(2).clamp(4, 32),
On second thought, this will most likely not cover most cases? One
system could be severely IO bound, the other one severely CPU bound...
> though the proportionality factor should be tested to determine what
> would actually be sensible here and of course be documented accordingly.
>
> I also thought a minimum of 4 threads for verification makes sense, as
> when the default value of 1 thread is used, it has somewhat the same
> behavior as before adding the read thread pool (i.e. 1 thread for
> reading, 4 threads for verification) and would scale somewhat
> accordingly. The threads should also clamped to a max of 32 to respect
> the constraints of the schema also stating 32 as a max.
>
> What do you think?
I think it would make sense to keep both decoupled for the time being,
especially since this might depend strongly on the backend. E.g. for S3
backed datastores you might gain a lot by increasing the number of
readers, but not much by increasing the number of verify threads.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 19+ messages in thread
end of thread, other threads:[~2025-11-06 11:57 UTC | newest]
Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-11-05 15:51 [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Nicolas Frey
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox 1/1] pbs-api-types: jobs: verify: add worker-threads to VerificationJobConfig Nicolas Frey
2025-11-06 8:14 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 1/4] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-06 8:54 ` Christian Ebner
2025-11-06 9:04 ` Nicolas Frey
2025-11-06 9:26 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 2/4] api: verify: use worker-threads to determine the number of threads to use Nicolas Frey
2025-11-06 9:09 ` Christian Ebner
2025-11-06 9:23 ` Nicolas Frey
2025-11-06 9:32 ` Christian Ebner
2025-11-06 11:22 ` Nicolas Frey
2025-11-06 11:57 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 3/4] api: verify: add worker-threads to update endpoint Nicolas Frey
2025-11-06 9:13 ` Christian Ebner
2025-11-05 15:51 ` [pbs-devel] [PATCH proxmox-backup 4/4] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-06 9:22 ` Christian Ebner
2025-11-06 9:25 ` Nicolas Frey
2025-11-06 8:02 ` [pbs-devel] [PATCH proxmox{, -backup} 0/5] parallelize chunk reads in verification Christian Ebner
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.