From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC PATCH proxmox-backup 3/3] verify: use separate read pool for reading chunks
Date: Mon, 7 Jul 2025 15:27:06 +0200 [thread overview]
Message-ID: <20250707132706.2854973-4-d.csapak@proxmox.com> (raw)
In-Reply-To: <20250707132706.2854973-1-d.csapak@proxmox.com>
instead of having each 'worker thread' read and then verify it's chunk,
use a separate 'reader pool' that reads chunks in parallell but
independent from verifying.
While this does introduces 4 new threads, they should be mostly busy with
reading from disk and not doing anything cpu intensive.
The advantage vs the current system is that the threads can start to
read the next chunks while the previous ones are still being verified.
Due to the nature of the ParallelHandler, the channel is bounded to the
number of threads, so there won't be more than 4 chunks read in advance.
In my local tests I measured the following speed difference:
verified a single snapshot with ~64 GiB (4x the RAM size) with 12 cores
current: ~550MiB/s
previous patch (moving loading into threads): ~950MiB/s
this patch: ~1150MiB/s
Obviously it increased the IO and CPU load in line with the throughput.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/backup/verify.rs | 49 +++++++++++++++++++++++++++-----------------
1 file changed, 30 insertions(+), 19 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 83dd0d9a3..b139819a6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -17,7 +17,7 @@ use pbs_api_types::{
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
-use pbs_datastore::{DataStore, StoreProgress};
+use pbs_datastore::{DataBlob, DataStore, StoreProgress};
use crate::tools::parallel_handler::ParallelHandler;
@@ -114,24 +114,8 @@ fn verify_index_chunks(
let corrupt_chunks = Arc::clone(&verify_worker.corrupt_chunks);
let verified_chunks = Arc::clone(&verify_worker.verified_chunks);
let errors = Arc::clone(&errors);
- let read_bytes = Arc::clone(&read_bytes);
- let decoded_bytes = Arc::clone(&decoded_bytes);
- move |(digest, size): ([u8; 32], u64)| {
- let chunk = match datastore.load_chunk(&digest) {
- Err(err) => {
- corrupt_chunks.lock().unwrap().insert(digest);
- error!("can't verify chunk, load failed - {err}");
- errors.fetch_add(1, Ordering::SeqCst);
- rename_corrupted_chunk(datastore.clone(), &digest);
- return Ok(());
- }
- Ok(chunk) => {
- read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
- decoded_bytes.fetch_add(size, Ordering::SeqCst);
- chunk
- }
- };
+ move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
corrupt_chunks.lock().unwrap().insert(digest);
@@ -162,6 +146,32 @@ fn verify_index_chunks(
}
});
+ let reader_pool = ParallelHandler::new("read chunks", 4, {
+ let datastore = Arc::clone(&verify_worker.datastore);
+ let corrupt_chunks = Arc::clone(&verify_worker.corrupt_chunks);
+ let errors = Arc::clone(&errors);
+ let read_bytes = Arc::clone(&read_bytes);
+ let decoded_bytes = Arc::clone(&decoded_bytes);
+ let decoder_pool = decoder_pool.channel();
+
+ move |(digest, size): ([u8; 32], u64)| {
+ match datastore.load_chunk(&digest) {
+ Err(err) => {
+ corrupt_chunks.lock().unwrap().insert(digest);
+ error!("can't verify chunk, load failed - {err}");
+ errors.fetch_add(1, Ordering::SeqCst);
+ rename_corrupted_chunk(datastore.clone(), &digest);
+ }
+ Ok(chunk) => {
+ read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+ decoded_bytes.fetch_add(size, Ordering::SeqCst);
+ decoder_pool.send((chunk, digest, size))?;
+ }
+ }
+ Ok(())
+ }
+ });
+
let skip_chunk = |digest: &[u8; 32]| -> bool {
if verify_worker
.verified_chunks
@@ -209,9 +219,10 @@ fn verify_index_chunks(
continue; // already verified or marked corrupt
}
- decoder_pool.send((info.digest, info.size()))?;
+ reader_pool.send((info.digest, info.size()))?;
}
+ reader_pool.complete()?;
decoder_pool.complete()?;
let elapsed = start_time.elapsed().as_secs_f64();
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
prev parent reply other threads:[~2025-07-07 13:26 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-07-07 13:27 [pbs-devel] [RFC PATCH proxmox-backup stable-3 0/3] improve verify speed under certain conditions Dominik Csapak
2025-07-07 13:27 ` [pbs-devel] [RFC PATCH proxmox-backup 1/3] verify: rename variables Dominik Csapak
2025-07-07 13:27 ` [pbs-devel] [RFC PATCH proxmox-backup 2/3] verify: move chunk loading into parallel handler Dominik Csapak
2025-07-07 13:27 ` Dominik Csapak [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250707132706.2854973-4-d.csapak@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox