From: Nicolas Frey <n.frey@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct
Date: Mon, 10 Nov 2025 09:44:13 +0100 [thread overview]
Message-ID: <20251110084417.173290-6-n.frey@proxmox.com> (raw)
In-Reply-To: <20251110084417.173290-1-n.frey@proxmox.com>
Introduces a new state struct `IndexVerifyState` so that we only need
to pass around and clone one `Arc`.
Suggested-by: Christian Ebner <c.ebner@proxmox.com>
Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
---
src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
1 file changed, 67 insertions(+), 63 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index c0ff15d4..9a20c8e1 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -34,6 +34,34 @@ pub struct VerifyWorker {
backend: DatastoreBackend,
}
+struct IndexVerifyState {
+ read_bytes: AtomicU64,
+ decoded_bytes: AtomicU64,
+ errors: AtomicUsize,
+ datastore: Arc<DataStore>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ start_time: Instant,
+}
+
+impl IndexVerifyState {
+ fn new(
+ datastore: &Arc<DataStore>,
+ corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+ verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
+ ) -> Self {
+ Self {
+ read_bytes: AtomicU64::new(0),
+ decoded_bytes: AtomicU64::new(0),
+ errors: AtomicUsize::new(0),
+ datastore: Arc::clone(datastore),
+ corrupt_chunks: Arc::clone(corrupt_chunks),
+ verified_chunks: Arc::clone(verified_chunks),
+ start_time: Instant::now(),
+ }
+ }
+}
+
impl VerifyWorker {
/// Creates a new VerifyWorker for a given task worker and datastore.
pub fn new(
@@ -81,24 +109,20 @@ impl VerifyWorker {
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
- let errors = Arc::new(AtomicUsize::new(0));
-
- let start_time = Instant::now();
-
- let read_bytes = Arc::new(AtomicU64::new(0));
- let decoded_bytes = Arc::new(AtomicU64::new(0));
+ let verify_state = Arc::new(IndexVerifyState::new(
+ &self.datastore,
+ &self.corrupt_chunks,
+ &self.verified_chunks,
+ ));
let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
- let datastore = Arc::clone(&self.datastore);
- let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
- let verified_chunks = Arc::clone(&self.verified_chunks);
- let errors = Arc::clone(&errors);
+ let verify_state = Arc::clone(&verify_state);
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
let chunk_crypt_mode = match chunk.crypt_mode() {
Err(err) => {
- corrupt_chunks.lock().unwrap().insert(digest);
+ verify_state.corrupt_chunks.lock().unwrap().insert(digest);
info!("can't verify chunk, unknown CryptMode - {err}");
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
return Ok(());
}
Ok(mode) => mode,
@@ -108,20 +132,20 @@ impl VerifyWorker {
info!(
"chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
);
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
}
if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
- corrupt_chunks.lock().unwrap().insert(digest);
+ verify_state.corrupt_chunks.lock().unwrap().insert(digest);
info!("{err}");
- errors.fetch_add(1, Ordering::SeqCst);
- match datastore.rename_corrupt_chunk(&digest) {
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
+ match verify_state.datastore.rename_corrupt_chunk(&digest) {
Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
Err(err) => info!("{err}"),
_ => (),
}
} else {
- verified_chunks.lock().unwrap().insert(digest);
+ verify_state.verified_chunks.lock().unwrap().insert(digest);
}
Ok(())
@@ -134,7 +158,7 @@ impl VerifyWorker {
} else if self.corrupt_chunks.lock().unwrap().contains(digest) {
let digest_str = hex::encode(digest);
info!("chunk {digest_str} was marked as corrupt");
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
true
} else {
false
@@ -155,27 +179,18 @@ impl VerifyWorker {
let reader_pool = ParallelHandler::new("read chunks", 4, {
let decoder_pool = decoder_pool.channel();
- let datastore = Arc::clone(&self.datastore);
- let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
- let read_bytes = Arc::clone(&read_bytes);
- let decoded_bytes = Arc::clone(&decoded_bytes);
- let errors = Arc::clone(&errors);
+ let verify_state = Arc::clone(&verify_state);
let backend = self.backend.clone();
move |info: ChunkReadInfo| {
Self::verify_chunk_by_backend(
&backend,
- Arc::clone(&datastore),
- Arc::clone(&corrupt_chunks),
- Arc::clone(&read_bytes),
- Arc::clone(&decoded_bytes),
- Arc::clone(&errors),
+ Arc::clone(&verify_state),
&decoder_pool,
&info,
)
}
});
-
for (pos, _) in chunk_list {
self.worker.check_abort()?;
self.worker.fail_on_shutdown()?;
@@ -192,10 +207,10 @@ impl VerifyWorker {
reader_pool.complete()?;
- let elapsed = start_time.elapsed().as_secs_f64();
+ let elapsed = verify_state.start_time.elapsed().as_secs_f64();
- let read_bytes = read_bytes.load(Ordering::SeqCst);
- let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
+ let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
+ let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
@@ -203,44 +218,39 @@ impl VerifyWorker {
let read_speed = read_bytes_mib / elapsed;
let decode_speed = decoded_bytes_mib / elapsed;
- let error_count = errors.load(Ordering::SeqCst);
+ let error_count = verify_state.errors.load(Ordering::SeqCst);
info!(
" verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
);
- if errors.load(Ordering::SeqCst) > 0 {
+ if verify_state.errors.load(Ordering::SeqCst) > 0 {
bail!("chunks could not be verified");
}
Ok(())
}
- #[allow(clippy::too_many_arguments)]
fn verify_chunk_by_backend(
backend: &DatastoreBackend,
- datastore: Arc<DataStore>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
- read_bytes: Arc<AtomicU64>,
- decoded_bytes: Arc<AtomicU64>,
- errors: Arc<AtomicUsize>,
+ verify_state: Arc<IndexVerifyState>,
decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
info: &ChunkReadInfo,
) -> Result<(), Error> {
match backend {
- DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
+ DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
Err(err) => Self::add_corrupt_chunk(
- datastore,
- corrupt_chunks,
+ verify_state,
info.digest,
- errors,
&format!("can't verify chunk, load failed - {err}"),
),
Ok(chunk) => {
let size = info.size();
- read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+ verify_state
+ .read_bytes
+ .fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- decoded_bytes.fetch_add(size, Ordering::SeqCst);
+ verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
},
DatastoreBackend::S3(s3_client) => {
@@ -257,28 +267,28 @@ impl VerifyWorker {
match chunk_result {
Ok(chunk) => {
let size = info.size();
- read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+ verify_state
+ .read_bytes
+ .fetch_add(chunk.raw_size(), Ordering::SeqCst);
decoder_pool.send((chunk, info.digest, size))?;
- decoded_bytes.fetch_add(size, Ordering::SeqCst);
+ verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
}
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
error!("can't verify chunk, load failed - {err}");
}
}
}
Ok(None) => Self::add_corrupt_chunk(
- datastore,
- corrupt_chunks,
+ verify_state,
info.digest,
- errors,
&format!(
"can't verify missing chunk with digest {}",
hex::encode(info.digest)
),
),
Err(err) => {
- errors.fetch_add(1, Ordering::SeqCst);
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
error!("can't verify chunk, load failed - {err}");
}
}
@@ -287,19 +297,13 @@ impl VerifyWorker {
Ok(())
}
- fn add_corrupt_chunk(
- datastore: Arc<DataStore>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
- digest: [u8; 32],
- errors: Arc<AtomicUsize>,
- message: &str,
- ) {
+ fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
// Panic on poisoned mutex
- let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
+ let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
corrupt_chunks.insert(digest);
error!(message);
- errors.fetch_add(1, Ordering::SeqCst);
- match datastore.rename_corrupt_chunk(&digest) {
+ verify_state.errors.fetch_add(1, Ordering::SeqCst);
+ match verify_state.datastore.rename_corrupt_chunk(&digest) {
Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
Err(err) => info!("{err}"),
_ => (),
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-10 8:43 UTC|newest]
Thread overview: 22+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-10 8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` Nicolas Frey [this message]
2025-11-11 10:22 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
2025-11-11 10:35 ` Nicolas Frey
2025-11-13 9:33 ` [pbs-devel] superseded: " Nicolas Frey
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20251110084417.173290-6-n.frey@proxmox.com \
--to=n.frey@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox