* [pve-devel] [PATCH PBS restore 1/1] Make retrieval of chunks concurrent when restoring backup, add more statistics on chunk fetching and writing to storage. Allow configuring the number of threads fetching chunks using PBS_RESTORE_CONCURRENCY environment variable. [not found] <20250627192426.582978-1-adam.kalisz@notnullmakers.com> @ 2025-06-27 19:24 ` Adam Kalisz via pve-devel [not found] ` <20250627192426.582978-2-adam.kalisz@notnullmakers.com> 1 sibling, 0 replies; 2+ messages in thread From: Adam Kalisz via pve-devel @ 2025-06-27 19:24 UTC (permalink / raw) To: pve-devel Cc: Adam Kalisz, Matt Neuforth, Daniel Škarda, Václav Svátek [-- Attachment #1: Type: message/rfc822, Size: 12928 bytes --] From: Adam Kalisz <adam.kalisz@notnullmakers.com> To: pve-devel@lists.proxmox.com Cc: "Adam Kalisz" <adam.kalisz@notnullmakers.com>, "Daniel Škarda" <daniel.skarda@notnullmakers.com>, "Matt Neuforth" <mneuforth@milbankworks.com>, "Václav Svátek" <svatek@cmis.cz> Subject: [PATCH PBS restore 1/1] Make retrieval of chunks concurrent when restoring backup, add more statistics on chunk fetching and writing to storage. Allow configuring the number of threads fetching chunks using PBS_RESTORE_CONCURRENCY environment variable. Date: Fri, 27 Jun 2025 21:24:26 +0200 Message-ID: <20250627192426.582978-2-adam.kalisz@notnullmakers.com> Co-authored-by: Daniel Škarda <daniel.skarda@notnullmakers.com> Tested-by: Matt Neuforth <mneuforth@milbankworks.com> Financed-by: Václav Svátek <svatek@cmis.cz> --- src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 17 deletions(-) diff --git a/src/restore.rs b/src/restore.rs index 5a5a398..c75bf0d 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -1,7 +1,11 @@ use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, +}; use anyhow::{bail, format_err, Error}; +use futures::stream::StreamExt; use once_cell::sync::OnceCell; use tokio::runtime::Runtime; @@ -69,7 +73,7 @@ impl RestoreTask { let runtime = get_runtime_with_builder(|| { let mut builder = tokio::runtime::Builder::new_multi_thread(); builder.enable_all(); - builder.max_blocking_threads(2); + builder.max_blocking_threads(32); builder.worker_threads(4); builder.thread_name("proxmox-restore-worker"); builder @@ -149,9 +153,7 @@ impl RestoreTask { )?; let most_used = index.find_most_used_chunks(8); - let file_info = manifest.lookup_file_info(&archive_name)?; - let chunk_reader = RemoteChunkReader::new( Arc::clone(&client), self.crypt_config.clone(), @@ -162,13 +164,43 @@ impl RestoreTask { let mut per = 0; let mut bytes = 0; let mut zeroes = 0; - + let mut storage_nonzero_write_time = std::time::Duration::new(0, 0); + let mut storage_nonzero_writes: u64 = 0; + let mut chunk_fetch_time = std::time::Duration::new(0, 0); + let chunks_fetched = Arc::new(AtomicU64::new(0)); let start_time = std::time::Instant::now(); + // Should be lower than max_blocking_threads in BackupSetup + let mut concurrent_requests: usize = 4; + + if let Ok(val_str) = std::env::var("PBS_RESTORE_CONCURRENCY") { + match val_str.parse::<usize>() { + Ok(num_threads) if num_threads > 0 => { + if verbose { + eprintln!( + "Using custom concurrency level from environment ({} threads)", + num_threads + ); + } + concurrent_requests = num_threads; + } + _ => { + if verbose { + eprintln!( + "Using default concurrency level ({} threads)", + concurrent_requests + ); + } + } + } + } + + let mut chunk_futures = Vec::new(); for pos in 0..index.index_count() { - let digest = index.index_digest(pos).unwrap(); + let digest = index.index_digest(pos).unwrap().clone(); let offset = (pos * index.chunk_size) as u64; - if digest == &zero_chunk_digest { + + if digest == zero_chunk_digest { let res = write_zero_callback(offset, index.chunk_size as u64); if res < 0 { bail!("write_zero_callback failed ({})", res); @@ -176,22 +208,54 @@ impl RestoreTask { bytes += index.chunk_size; zeroes += index.chunk_size; } else { - let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?; - let res = write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes += raw_data.len(); + let chunk_reader = chunk_reader.clone(); + let chunks_fetched_clone = Arc::clone(&chunks_fetched); + + let future = async move { + tokio::task::spawn_blocking(move || { + let start_chunk_fetch_time = std::time::Instant::now(); + let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; + let fetch_elapsed = start_chunk_fetch_time.elapsed(); + chunks_fetched_clone.fetch_add(1, Ordering::Relaxed); + Ok::<_, Error>((offset, raw_data, fetch_elapsed)) + }) + .await + .unwrap() + }; + chunk_futures.push(future); + } + } + + let mut stream = futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requests); + + while let Some(result) = stream.next().await { + let (offset, raw_data, fetch_elapsed_time) = result?; + let start_storage_write_time = std::time::Instant::now(); + let res = write_data_callback(offset, &raw_data); + let storage_write_elapsed = start_storage_write_time.elapsed(); + if res < 0 { + bail!("write_data_callback failed ({})", res); } + storage_nonzero_write_time = storage_nonzero_write_time + .checked_add(storage_write_elapsed) + .unwrap_or_default(); + storage_nonzero_writes += 1; + chunk_fetch_time += fetch_elapsed_time; + let chunk_len = raw_data.len(); + bytes += chunk_len; + if verbose { - let next_per = ((pos + 1) * 100) / index.index_count(); + let next_per = (bytes * 100) / (index.index_count() * index.chunk_size); if per != next_per { eprintln!( - "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", + "progress {}% (read {} bytes, zeroes = {}% ({} bytes), \ + nonzero writes = {}, chunks fetched = {}, duration {} sec)", next_per, bytes, zeroes * 100 / bytes, zeroes, + storage_nonzero_writes, + chunks_fetched.load(Ordering::Relaxed), start_time.elapsed().as_secs() ); per = next_per; @@ -202,12 +266,15 @@ impl RestoreTask { let end_time = std::time::Instant::now(); let elapsed = end_time.duration_since(start_time); eprintln!( - "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", + "restore image complete (bytes={}, avg fetch time={:.4}ms, avg time per nonzero write={:.4}ms, \ + storage nonzero total write time={:.3}s, duration={:.2}s, speed={:.2}MB/s)", bytes, + chunk_fetch_time.as_nanos() as f64 / (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0), + storage_nonzero_write_time.as_nanos() as f64 / (storage_nonzero_writes as f64 * 1_000_000.0), + storage_nonzero_write_time.as_secs_f64(), elapsed.as_secs_f64(), bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64()) ); - Ok(()) } -- 2.47.2 [-- Attachment #2: Type: text/plain, Size: 160 bytes --] _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel ^ permalink raw reply [flat|nested] 2+ messages in thread
[parent not found: <20250627192426.582978-2-adam.kalisz@notnullmakers.com>]
* Re: [pve-devel] [PATCH PBS restore 1/1] Sorry about the first submission with the "cover letter" [not found] ` <20250627192426.582978-2-adam.kalisz@notnullmakers.com> @ 2025-06-27 19:34 ` Adam Kalisz via pve-devel 0 siblings, 0 replies; 2+ messages in thread From: Adam Kalisz via pve-devel @ 2025-06-27 19:34 UTC (permalink / raw) To: pve-devel; +Cc: Adam Kalisz [-- Attachment #1: Type: message/rfc822, Size: 18399 bytes --] From: Adam Kalisz <adam.kalisz@notnullmakers.com> To: pve-devel <pve-devel@lists.proxmox.com> Subject: Re: [PATCH PBS restore 1/1] Sorry about the first submission with the "cover letter" Date: Fri, 27 Jun 2025 21:34:46 +0200 Message-ID: <8528b64efc5044def7f41bfaa0401d6567b8fe29.camel@notnullmakers.com> On Fri, 2025-06-27 at 21:24 +0200, Adam Kalisz wrote: > Co-authored-by: Daniel Škarda <daniel.skarda@notnullmakers.com> > Tested-by: Matt Neuforth <mneuforth@milbankworks.com> > Financed-by: Václav Svátek <svatek@cmis.cz> > --- > src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++------- > -- > 1 file changed, 84 insertions(+), 17 deletions(-) > > diff --git a/src/restore.rs b/src/restore.rs > index 5a5a398..c75bf0d 100644 > --- a/src/restore.rs > +++ b/src/restore.rs > @@ -1,7 +1,11 @@ > use std::convert::TryInto; > -use std::sync::{Arc, Mutex}; > +use std::sync::{ > + atomic::{AtomicU64, Ordering}, > + Arc, Mutex, > +}; > > use anyhow::{bail, format_err, Error}; > +use futures::stream::StreamExt; > use once_cell::sync::OnceCell; > use tokio::runtime::Runtime; > > @@ -69,7 +73,7 @@ impl RestoreTask { > let runtime = get_runtime_with_builder(|| { > let mut builder = > tokio::runtime::Builder::new_multi_thread(); > builder.enable_all(); > - builder.max_blocking_threads(2); > + builder.max_blocking_threads(32); > builder.worker_threads(4); > builder.thread_name("proxmox-restore-worker"); > builder > @@ -149,9 +153,7 @@ impl RestoreTask { > )?; > > let most_used = index.find_most_used_chunks(8); > - > let file_info = manifest.lookup_file_info(&archive_name)?; > - > let chunk_reader = RemoteChunkReader::new( > Arc::clone(&client), > self.crypt_config.clone(), > @@ -162,13 +164,43 @@ impl RestoreTask { > let mut per = 0; > let mut bytes = 0; > let mut zeroes = 0; > - > + let mut storage_nonzero_write_time = > std::time::Duration::new(0, 0); > + let mut storage_nonzero_writes: u64 = 0; > + let mut chunk_fetch_time = std::time::Duration::new(0, 0); > + let chunks_fetched = Arc::new(AtomicU64::new(0)); > let start_time = std::time::Instant::now(); > + // Should be lower than max_blocking_threads in BackupSetup > + let mut concurrent_requests: usize = 4; > + > + if let Ok(val_str) = > std::env::var("PBS_RESTORE_CONCURRENCY") { > + match val_str.parse::<usize>() { > + Ok(num_threads) if num_threads > 0 => { > + if verbose { > + eprintln!( > + "Using custom concurrency level from > environment ({} threads)", > + num_threads > + ); > + } > + concurrent_requests = num_threads; > + } > + _ => { > + if verbose { > + eprintln!( > + "Using default concurrency level ({} > threads)", > + concurrent_requests > + ); > + } > + } > + } > + } > + > + let mut chunk_futures = Vec::new(); > > for pos in 0..index.index_count() { > - let digest = index.index_digest(pos).unwrap(); > + let digest = index.index_digest(pos).unwrap().clone(); > let offset = (pos * index.chunk_size) as u64; > - if digest == &zero_chunk_digest { > + > + if digest == zero_chunk_digest { > let res = write_zero_callback(offset, > index.chunk_size as u64); > if res < 0 { > bail!("write_zero_callback failed ({})", res); > @@ -176,22 +208,54 @@ impl RestoreTask { > bytes += index.chunk_size; > zeroes += index.chunk_size; > } else { > - let raw_data = ReadChunk::read_chunk(&chunk_reader, > digest)?; > - let res = write_data_callback(offset, &raw_data); > - if res < 0 { > - bail!("write_data_callback failed ({})", res); > - } > - bytes += raw_data.len(); > + let chunk_reader = chunk_reader.clone(); > + let chunks_fetched_clone = > Arc::clone(&chunks_fetched); > + > + let future = async move { > + tokio::task::spawn_blocking(move || { > + let start_chunk_fetch_time = > std::time::Instant::now(); > + let raw_data = > ReadChunk::read_chunk(&chunk_reader, &digest)?; > + let fetch_elapsed = > start_chunk_fetch_time.elapsed(); > + chunks_fetched_clone.fetch_add(1, > Ordering::Relaxed); > + Ok::<_, Error>((offset, raw_data, > fetch_elapsed)) > + }) > + .await > + .unwrap() > + }; > + chunk_futures.push(future); > + } > + } > + > + let mut stream = > futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requ > ests); > + > + while let Some(result) = stream.next().await { > + let (offset, raw_data, fetch_elapsed_time) = result?; > + let start_storage_write_time = > std::time::Instant::now(); > + let res = write_data_callback(offset, &raw_data); > + let storage_write_elapsed = > start_storage_write_time.elapsed(); > + if res < 0 { > + bail!("write_data_callback failed ({})", res); > } > + storage_nonzero_write_time = storage_nonzero_write_time > + .checked_add(storage_write_elapsed) > + .unwrap_or_default(); > + storage_nonzero_writes += 1; > + chunk_fetch_time += fetch_elapsed_time; > + let chunk_len = raw_data.len(); > + bytes += chunk_len; > + > if verbose { > - let next_per = ((pos + 1) * 100) / > index.index_count(); > + let next_per = (bytes * 100) / (index.index_count() > * index.chunk_size); > if per != next_per { > eprintln!( > - "progress {}% (read {} bytes, zeroes = {}% > ({} bytes), duration {} sec)", > + "progress {}% (read {} bytes, zeroes = {}% > ({} bytes), \ > + nonzero writes = {}, chunks fetched > = {}, duration {} sec)", > next_per, > bytes, > zeroes * 100 / bytes, > zeroes, > + storage_nonzero_writes, > + chunks_fetched.load(Ordering::Relaxed), > start_time.elapsed().as_secs() > ); > per = next_per; > @@ -202,12 +266,15 @@ impl RestoreTask { > let end_time = std::time::Instant::now(); > let elapsed = end_time.duration_since(start_time); > eprintln!( > - "restore image complete (bytes={}, duration={:.2}s, > speed={:.2}MB/s)", > + "restore image complete (bytes={}, avg fetch > time={:.4}ms, avg time per nonzero write={:.4}ms, \ > + storage nonzero total write time={:.3}s, > duration={:.2}s, speed={:.2}MB/s)", > bytes, > + chunk_fetch_time.as_nanos() as f64 / > (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0), > + storage_nonzero_write_time.as_nanos() as f64 / > (storage_nonzero_writes as f64 * 1_000_000.0), > + storage_nonzero_write_time.as_secs_f64(), > elapsed.as_secs_f64(), > bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64()) > ); > - > Ok(()) > } > [-- Attachment #2: Type: text/plain, Size: 160 bytes --] _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel ^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2025-06-27 19:34 UTC | newest] Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- [not found] <20250627192426.582978-1-adam.kalisz@notnullmakers.com> 2025-06-27 19:24 ` [pve-devel] [PATCH PBS restore 1/1] Make retrieval of chunks concurrent when restoring backup, add more statistics on chunk fetching and writing to storage. Allow configuring the number of threads fetching chunks using PBS_RESTORE_CONCURRENCY environment variable Adam Kalisz via pve-devel [not found] ` <20250627192426.582978-2-adam.kalisz@notnullmakers.com> 2025-06-27 19:34 ` [pve-devel] [PATCH PBS restore 1/1] Sorry about the first submission with the "cover letter" Adam Kalisz via pve-devel
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.