From: Christian Ebner <c.ebner@proxmox.com>
To: Proxmox Backup Server development discussion
<pbs-devel@lists.proxmox.com>, Nicolas Frey <n.frey@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct
Date: Tue, 11 Nov 2025 11:22:19 +0100 [thread overview]
Message-ID: <bfd7bfac-2ffa-458d-a908-1533e54d2af4@proxmox.com> (raw)
In-Reply-To: <20251110084417.173290-6-n.frey@proxmox.com>
as mentioned in the previous patch, IMO this one should be introduced
before that one, adding the parallel handler only on top of that.
On 11/10/25 9:44 AM, Nicolas Frey wrote:
> Introduces a new state struct `IndexVerifyState` so that we only need
> to pass around and clone one `Arc`.
>
> Suggested-by: Christian Ebner <c.ebner@proxmox.com>
> Signed-off-by: Nicolas Frey <n.frey@proxmox.com>
> ---
> src/backup/verify.rs | 130 ++++++++++++++++++++++---------------------
> 1 file changed, 67 insertions(+), 63 deletions(-)
>
> diff --git a/src/backup/verify.rs b/src/backup/verify.rs
> index c0ff15d4..9a20c8e1 100644
> --- a/src/backup/verify.rs
> +++ b/src/backup/verify.rs
> @@ -34,6 +34,34 @@ pub struct VerifyWorker {
> backend: DatastoreBackend,
> }
>
> +struct IndexVerifyState {
> + read_bytes: AtomicU64,
> + decoded_bytes: AtomicU64,
> + errors: AtomicUsize,
> + datastore: Arc<DataStore>,
> + corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> + verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> + start_time: Instant,
> +}
> +
> +impl IndexVerifyState {
> + fn new(
> + datastore: &Arc<DataStore>,
> + corrupt_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> + verified_chunks: &Arc<Mutex<HashSet<[u8; 32]>>>,
> + ) -> Self {
> + Self {
> + read_bytes: AtomicU64::new(0),
> + decoded_bytes: AtomicU64::new(0),
> + errors: AtomicUsize::new(0),
> + datastore: Arc::clone(datastore),
> + corrupt_chunks: Arc::clone(corrupt_chunks),
> + verified_chunks: Arc::clone(verified_chunks),
> + start_time: Instant::now(),
> + }
> + }
> +}
> +
> impl VerifyWorker {
> /// Creates a new VerifyWorker for a given task worker and datastore.
> pub fn new(
> @@ -81,24 +109,20 @@ impl VerifyWorker {
> index: Box<dyn IndexFile + Send>,
> crypt_mode: CryptMode,
> ) -> Result<(), Error> {
> - let errors = Arc::new(AtomicUsize::new(0));
> -
> - let start_time = Instant::now();
> -
> - let read_bytes = Arc::new(AtomicU64::new(0));
> - let decoded_bytes = Arc::new(AtomicU64::new(0));
> + let verify_state = Arc::new(IndexVerifyState::new(
> + &self.datastore,
> + &self.corrupt_chunks,
> + &self.verified_chunks,
> + ));
>
> let decoder_pool = ParallelHandler::new("verify chunk decoder", 4, {
> - let datastore = Arc::clone(&self.datastore);
> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> - let verified_chunks = Arc::clone(&self.verified_chunks);
> - let errors = Arc::clone(&errors);
> + let verify_state = Arc::clone(&verify_state);
> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
> let chunk_crypt_mode = match chunk.crypt_mode() {
> Err(err) => {
> - corrupt_chunks.lock().unwrap().insert(digest);
> + verify_state.corrupt_chunks.lock().unwrap().insert(digest);
> info!("can't verify chunk, unknown CryptMode - {err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> return Ok(());
> }
> Ok(mode) => mode,
> @@ -108,20 +132,20 @@ impl VerifyWorker {
> info!(
> "chunk CryptMode {chunk_crypt_mode:?} does not match index CryptMode {crypt_mode:?}"
> );
> - errors.fetch_add(1, Ordering::SeqCst);
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> }
>
> if let Err(err) = chunk.verify_unencrypted(size as usize, &digest) {
> - corrupt_chunks.lock().unwrap().insert(digest);
> + verify_state.corrupt_chunks.lock().unwrap().insert(digest);
> info!("{err}");
> - errors.fetch_add(1, Ordering::SeqCst);
> - match datastore.rename_corrupt_chunk(&digest) {
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> + match verify_state.datastore.rename_corrupt_chunk(&digest) {
> Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
> Err(err) => info!("{err}"),
> _ => (),
> }
> } else {
> - verified_chunks.lock().unwrap().insert(digest);
> + verify_state.verified_chunks.lock().unwrap().insert(digest);
> }
>
> Ok(())
> @@ -134,7 +158,7 @@ impl VerifyWorker {
> } else if self.corrupt_chunks.lock().unwrap().contains(digest) {
> let digest_str = hex::encode(digest);
> info!("chunk {digest_str} was marked as corrupt");
> - errors.fetch_add(1, Ordering::SeqCst);
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> true
> } else {
> false
> @@ -155,27 +179,18 @@ impl VerifyWorker {
>
> let reader_pool = ParallelHandler::new("read chunks", 4, {
> let decoder_pool = decoder_pool.channel();
> - let datastore = Arc::clone(&self.datastore);
> - let corrupt_chunks = Arc::clone(&self.corrupt_chunks);
> - let read_bytes = Arc::clone(&read_bytes);
> - let decoded_bytes = Arc::clone(&decoded_bytes);
> - let errors = Arc::clone(&errors);
> + let verify_state = Arc::clone(&verify_state);
> let backend = self.backend.clone();
>
> move |info: ChunkReadInfo| {
> Self::verify_chunk_by_backend(
> &backend,
> - Arc::clone(&datastore),
> - Arc::clone(&corrupt_chunks),
> - Arc::clone(&read_bytes),
> - Arc::clone(&decoded_bytes),
> - Arc::clone(&errors),
> + Arc::clone(&verify_state),
> &decoder_pool,
> &info,
> )
> }
> });
> -
> for (pos, _) in chunk_list {
> self.worker.check_abort()?;
> self.worker.fail_on_shutdown()?;
> @@ -192,10 +207,10 @@ impl VerifyWorker {
>
> reader_pool.complete()?;
>
> - let elapsed = start_time.elapsed().as_secs_f64();
> + let elapsed = verify_state.start_time.elapsed().as_secs_f64();
>
> - let read_bytes = read_bytes.load(Ordering::SeqCst);
> - let decoded_bytes = decoded_bytes.load(Ordering::SeqCst);
> + let read_bytes = verify_state.read_bytes.load(Ordering::SeqCst);
> + let decoded_bytes = verify_state.decoded_bytes.load(Ordering::SeqCst);
>
> let read_bytes_mib = (read_bytes as f64) / (1024.0 * 1024.0);
> let decoded_bytes_mib = (decoded_bytes as f64) / (1024.0 * 1024.0);
> @@ -203,44 +218,39 @@ impl VerifyWorker {
> let read_speed = read_bytes_mib / elapsed;
> let decode_speed = decoded_bytes_mib / elapsed;
>
> - let error_count = errors.load(Ordering::SeqCst);
> + let error_count = verify_state.errors.load(Ordering::SeqCst);
>
> info!(
> " verified {read_bytes_mib:.2}/{decoded_bytes_mib:.2} MiB in {elapsed:.2} seconds, speed {read_speed:.2}/{decode_speed:.2} MiB/s ({error_count} errors)"
> );
>
> - if errors.load(Ordering::SeqCst) > 0 {
> + if verify_state.errors.load(Ordering::SeqCst) > 0 {
> bail!("chunks could not be verified");
> }
>
> Ok(())
> }
>
> - #[allow(clippy::too_many_arguments)]
> fn verify_chunk_by_backend(
> backend: &DatastoreBackend,
> - datastore: Arc<DataStore>,
> - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> - read_bytes: Arc<AtomicU64>,
> - decoded_bytes: Arc<AtomicU64>,
> - errors: Arc<AtomicUsize>,
> + verify_state: Arc<IndexVerifyState>,
> decoder_pool: &SendHandle<(DataBlob, [u8; 32], u64)>,
> info: &ChunkReadInfo,
> ) -> Result<(), Error> {
> match backend {
> - DatastoreBackend::Filesystem => match datastore.load_chunk(&info.digest) {
> + DatastoreBackend::Filesystem => match verify_state.datastore.load_chunk(&info.digest) {
> Err(err) => Self::add_corrupt_chunk(
> - datastore,
> - corrupt_chunks,
> + verify_state,
> info.digest,
> - errors,
> &format!("can't verify chunk, load failed - {err}"),
> ),
> Ok(chunk) => {
> let size = info.size();
> - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> + verify_state
> + .read_bytes
> + .fetch_add(chunk.raw_size(), Ordering::SeqCst);
> decoder_pool.send((chunk, info.digest, size))?;
> - decoded_bytes.fetch_add(size, Ordering::SeqCst);
> + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
> }
> },
> DatastoreBackend::S3(s3_client) => {
> @@ -257,28 +267,28 @@ impl VerifyWorker {
> match chunk_result {
> Ok(chunk) => {
> let size = info.size();
> - read_bytes.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> + verify_state
> + .read_bytes
> + .fetch_add(chunk.raw_size(), Ordering::SeqCst);
> decoder_pool.send((chunk, info.digest, size))?;
> - decoded_bytes.fetch_add(size, Ordering::SeqCst);
> + verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst);
> }
> Err(err) => {
> - errors.fetch_add(1, Ordering::SeqCst);
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> error!("can't verify chunk, load failed - {err}");
> }
> }
> }
> Ok(None) => Self::add_corrupt_chunk(
> - datastore,
> - corrupt_chunks,
> + verify_state,
> info.digest,
> - errors,
> &format!(
> "can't verify missing chunk with digest {}",
> hex::encode(info.digest)
> ),
> ),
> Err(err) => {
> - errors.fetch_add(1, Ordering::SeqCst);
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> error!("can't verify chunk, load failed - {err}");
> }
> }
> @@ -287,19 +297,13 @@ impl VerifyWorker {
> Ok(())
> }
>
> - fn add_corrupt_chunk(
> - datastore: Arc<DataStore>,
> - corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> - digest: [u8; 32],
> - errors: Arc<AtomicUsize>,
> - message: &str,
> - ) {
> + fn add_corrupt_chunk(verify_state: Arc<IndexVerifyState>, digest: [u8; 32], message: &str) {
> // Panic on poisoned mutex
> - let mut corrupt_chunks = corrupt_chunks.lock().unwrap();
> + let mut corrupt_chunks = verify_state.corrupt_chunks.lock().unwrap();
> corrupt_chunks.insert(digest);
> error!(message);
> - errors.fetch_add(1, Ordering::SeqCst);
> - match datastore.rename_corrupt_chunk(&digest) {
> + verify_state.errors.fetch_add(1, Ordering::SeqCst);
> + match verify_state.datastore.rename_corrupt_chunk(&digest) {
> Ok(Some(new_path)) => info!("corrupt chunk renamed to {new_path:?}"),
> Err(err) => info!("{err}"),
> _ => (),
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-11 10:21 UTC|newest]
Thread overview: 22+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-10 8:44 [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Nicolas Frey
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 1/3] pbs-api-types: add schema for {worker, read, verify}-threads Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 2/3] pbs-api-types: jobs: add {read, verify}-threads to VerificationJobConfig Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox v3 3/3] pbs-api-types: use worker-threads schema for TapeBackupJobSetup Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api: verify: move chunk loading into parallel handler Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] api: verify: bundle parameters into new struct Nicolas Frey
2025-11-11 10:22 ` Christian Ebner [this message]
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] api: verify: determine the number of threads to use with {read, verify}-threads Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] api: verify: correct typo in comment Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] api: verify: add {read, verify}-threads to update endpoint Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-10 8:44 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] ui: verify: add option to set number of threads for job Nicolas Frey
2025-11-11 10:22 ` Christian Ebner
2025-11-11 10:21 ` [pbs-devel] [PATCH proxmox{, -backup} v3 0/9] parallelize chunk reads in verification Christian Ebner
2025-11-11 10:35 ` Nicolas Frey
2025-11-13 9:33 ` [pbs-devel] superseded: " Nicolas Frey
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=bfd7bfac-2ffa-458d-a908-1533e54d2af4@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=n.frey@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox