From: Nicolas Frey <n.frey@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v4 3/9] verify: refactor into associated functions to use new state struct
Date: Thu, 13 Nov 2025 10:31:12 +0100 [thread overview]
Message-ID: <20251113093118.195229-7-n.frey@proxmox.com> (raw)
In-Reply-To: <20251113093118.195229-1-n.frey@proxmox.com>
bundles nicely into one struct to pass around, to prepare for parallel
chunk loading.
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
src/backup/verify.rs | 114 +++++++++++++++++++++----------------------
1 file changed, 57 insertions(+), 57 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index e8ea49b2..e1eda039 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -20,7 +20,7 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
-use crate::tools::parallel_handler::ParallelHandler;
+use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
use crate::backup::hierarchy::ListAccessibleBackupGroups;
@@ -109,27 +109,20 @@ impl VerifyWorker {
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
- let errors = Arc::new(AtomicUsize::new(0));
-
- let start_time = Instant::now();
-
- let mut read_bytes = 0;
- let mut decoded_bytes = 0;
-
- let datastore2 = Arc::clone(&self.datastore);
- let corrupt_chunks2 = Arc::clone(&self.corrupt_chunks);
- let verified_chunks2 = Arc::clone(&self.verified_chunks);
- let errors2 = Arc::clone(&errors);
-
- let decoder_pool = ParallelHandler::new(
- "verify chunk decoder",
- 4,
+ let verify_state = Arc::new(IndexVerifyState::new(
+ &self.datastore,
+ &self.corrupt_chunks,
+ &self.verified_chunks,
+ ));
+
+ let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
+ let verify_state = Arc::clone(&verify_state);
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
- corrupt_chunks2.lock().unwrap().insert(digest);
+ verify_state.corrupt_chunks.lock().unwrap().insert(digest);
info!("can't verify chunk, unknown CryptMode - {err}");
- errors2.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Ok(mode) => mode,
@@ -139,25 +132,25 @@ impl VerifyWorker {
info!(
"chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
);
- errors2.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
- corrupt_chunks2.lock().unwrap().insert(digest);
+ verify_state.corrupt_chunks.lock().unwrap().insert(digest);
info!("{err}");
- errors2.fetch_add(1, Ordering::SeqCst);
- match datastore2.rename_corrupt_chunk(&digest) {
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
+ match verify_state.datastore.rename_corrupt_chunk(&digest) {
Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
Err(err) => info!("{err}"),
_ => (),
}
} else {
- verified_chunks2.lock().unwrap().insert(digest);
+ verify_state.verified_chunks.lock().unwrap().insert(digest);
}
Ok(())
- },
- );
+ }
+ });
let skip_chunk = |digest: &[u8; 32]| -> bool {
if self.verified_chunks.lock().unwrap().contains(digest) {
@@ -165,7 +158,7 @@ impl VerifyWorker {
} else if self.corrupt_chunks.lock().unwrap().contains(digest) {
let digest_str = hex::encode(digest);
info!("chunk {digest_str} was marked as corrupt");
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
true
} else {
false
@@ -195,18 +188,20 @@ impl VerifyWorker {
continue; // already verified or marked corrupt
}
- self.verify_chunk_by_backend(
+ Self::verify_chunk_by_backend(
+ &self.backend,
+ Arc::clone(&verify_state),
+ &decoder_pool.channel(),
&info,
- &mut read_bytes,
- &mut decoded_bytes,
- Arc::clone(&errors),
- &decoder_pool,
)?;
}
decoder_pool.complete()?;
- let elapsed = start_time.elapsed().as_secs_f64();
+ let elapsed = verify_state.start_time.elapsed().as_secs_f64();
+
+ let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
+ let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -214,13 +209,13 @@ impl VerifyWorker {
let read_speed = read_bytes_mib / elapsed;
let decode_speed = decoded_bytes_mib / elapsed;
- let error_count = errors.load(Ordering::SeqCst);
+ let error_count = verify_state.errors.load(Ordering::SeqCst);
info!(
" verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
);
- if errors.load(Ordering::SeqCst) > 0 {
+ if verify_state.errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified");
}
@@ -228,25 +223,25 @@ impl VerifyWorker {
}
fn verify_chunk_by_backend(
- &self,
+ backend: &DatastoreBackend,
+ verify_state: Arc<IndexVerifyState>,
+ decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
info: &ChunkReadInfo,
- read_bytes: &mut u64,
- decoded_bytes: &mut u64,
- errors: Arc<AtomicUsize>,
- decoder_pool: &ParallelHandler<(DataBlob, [u8; 32], u64)>,
) -> Result<(), Error> {
- match &self.backend {
- DatastoreBackend::Filesystem => match self.datastore.load_chunk(&info.digest) {
- Err(err) => self.add_corrupt_chunk(
+ match backend {
+ DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
+ Err(err) => Self::add_corrupt_chunk(
+ verify_state,
info.digest,
- errors,
&format!("can't verify chunk, load failed - {err}"),
),
Ok(chunk) => {
let size = info.size();
- *read_bytes += chunk.raw_size();
+ verify_state
+ .read_bytes
+ .fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- *decoded_bytes += size;
+ verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
},
DatastoreBackend::S3(s3_client) => {
@@ -258,13 +253,17 @@ impl VerifyWorker {
match DataBlob::from_raw(raw_chunk.to_bytes().to_vec()) {
Ok(chunk) => {
let size = info.size();
- *read_bytes += chunk.raw_size();
+ verify_state
+ .read_bytes
+ .fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- *decoded_bytes += size;
+ verify_state
+ .decoded_bytes
+ .fetch_add(size, Ordering::SeqCst);
}
- Err(err) => self.add_corrupt_chunk(
+ Err(err) => Self::add_corrupt_chunk(
+ verify_state,
info.digest,
- errors,
&format!(
"can't verify chunk with digest {} - {err}",
hex::encode(info.digest)
@@ -273,21 +272,21 @@ impl VerifyWorker {
}
}
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
error!("can't verify chunk, load failed - {err}");
}
}
}
- Ok(None) => self.add_corrupt_chunk(
+ Ok(None) => Self::add_corrupt_chunk(
+ verify_state,
info.digest,
- errors,
&format!(
"can't verify missing chunk with digest {}",
hex::encode(info.digest)
),
),
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
error!("can't verify chunk, load failed - {err}");
}
}
@@ -296,12 +295,13 @@ impl VerifyWorker {
Ok(())
}
- fn add_corrupt_chunk(&self, digest: [u8; 32], errors: Arc<AtomicUsize>, message: &str) {
+ fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
// Panic on poisoned mutex
- self.corrupt_chunks.lock().unwrap().insert(digest);
+ let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
+ corrupt_chunks.insert(digest);
error!(message);
- errors.fetch_add(1, Ordering::SeqCst);
- match self.datastore.rename_corrupt_chunk(&digest) {
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
+ match verify_state.datastore.rename_corrupt_chunk(&digest) {
Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
Err(err) => info!("{err}"),
_ => (),
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-13 9:31 UTC|newest]
Thread overview: 18+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-13 9:31 [pbs-devel] [PATCH proxmox{, -backup} v4 00/12] parallelize chunk reads in verification Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox v4 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox v4 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox v4 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 1/9] verify: correct typo in comment Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 2/9] verify: introduce new state struct Nicolas Frey
2025-11-13 9:31 ` Nicolas Frey [this message]
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 4/9] verify: move chunk loading into parallel handler Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 5/9] verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 6/9] verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 7/9] verify: add {read, verify}-threads to api schema in backup manager Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 8/9] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-13 9:31 ` [pbs-devel] [PATCH proxmox-backup v4 9/9] docs: verify: document {read, verify}-threads and update screenshot Nicolas Frey
2025-11-14 10:41 ` Christian Ebner
2025-11-13 12:53 ` [pbs-devel] [PATCH proxmox{, -backup} v4 00/12] parallelize chunk reads in verification Fabian Grünbichler
2025-11-14 10:46 ` Christian Ebner
2025-11-14 21:36 ` [pbs-devel] applied: " Thomas Lamprecht
2025-11-14 22:17 ` Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20251113093118.195229-7-n.frey@proxmox.com \
--to=n.frey@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox