public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup] part fix 5560: client: periodically show backup progress
Date: Wed, 25 Sep 2024 14:22:33 +0200	[thread overview]
Message-ID: <20240925122233.398359-1-c.ebner@proxmox.com> (raw)

Spawn a new tokio task which about every minute displays the
cumulative progress of the backup for all upload streams.

Add accounting for uploaded chunks, to distinguish from chunks queued
for upload, but not actually uploaded yet.

Since streams run concurrently (e.g. catalog upload and pxar upload),
prefix each log line by the archive name as no output order is
guaranteed.

Example output in the backup task log:
```
...
INFO:  root.pxar: elapsed 60.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 13.021 MiB (compressed 5.327 MiB, average: 222.221 KiB/s)
INFO:  catalog.pcat1: elapsed 60.00 s, new: 0 B, reused: 0 B, total: 0 B, uploaded: 0 B (compressed 0 B, average: 0 B/s)
INFO:  catalog.pcat1: elapsed 120.00 s, new: 0 B, reused: 0 B, total: 0 B, uploaded: 0 B (compressed 0 B, average: 0 B/s)
INFO:  root.pxar: elapsed 120.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 27.068 MiB (compressed 11.583 MiB, average: 230.977 KiB/s)
INFO:  root.pxar: elapsed 180.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 36.138 MiB (compressed 14.987 MiB, average: 205.58 KiB/s)
INFO:  catalog.pcat1: elapsed 180.00 s, new: 0 B, reused: 0 B, total: 0 B, uploaded: 0 B (compressed 0 B, average: 0 B/s)
...
```

This partially fixes issue 5560:
https://bugzilla.proxmox.com/show_bug.cgi?id=5560

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-client/src/backup_writer.rs | 87 ++++++++++++++++++++++++++++-----
 1 file changed, 75 insertions(+), 12 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index d63c09b5a..9080ce4af 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -65,7 +65,13 @@ struct UploadStats {
     csum: [u8; 32],
 }
 
-type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
+struct ChunkUploadResponse {
+    future: h2::client::ResponseFuture,
+    size: usize,
+    size_encoded: usize,
+}
+
+type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<ChunkUploadResponse>)>;
 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
 
 impl BackupWriter {
@@ -332,6 +338,12 @@ impl BackupWriter {
             .as_u64()
             .unwrap();
 
+        let archive = if log::log_enabled!(log::Level::Debug) {
+            archive_name
+        } else {
+            pbs_tools::format::strip_server_file_extension(archive_name)
+        };
+
         let upload_stats = Self::upload_chunk_info_stream(
             self.h2.clone(),
             wid,
@@ -345,16 +357,12 @@ impl BackupWriter {
             },
             options.compress,
             injections,
+            archive,
         )
         .await?;
 
         let size_dirty = upload_stats.size - upload_stats.size_reused;
         let size: HumanByte = upload_stats.size.into();
-        let archive = if log::log_enabled!(log::Level::Debug) {
-            archive_name
-        } else {
-            pbs_tools::format::strip_server_file_extension(archive_name)
-        };
 
         if upload_stats.chunk_injected > 0 {
             log::info!(
@@ -462,6 +470,8 @@ impl BackupWriter {
         h2: H2Client,
         wid: u64,
         path: String,
+        uploaded: Arc<AtomicUsize>,
+        uploaded_compressed: Arc<AtomicUsize>,
     ) -> (UploadQueueSender, UploadResultReceiver) {
         let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64);
         let (verify_result_tx, verify_result_rx) = oneshot::channel();
@@ -470,15 +480,23 @@ impl BackupWriter {
         tokio::spawn(
             ReceiverStream::new(verify_queue_rx)
                 .map(Ok::<_, Error>)
-                .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
+                .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<ChunkUploadResponse>)| {
                     match (response, merged_chunk_info) {
                         (Some(response), MergedChunkInfo::Known(list)) => {
                             Either::Left(
                                 response
+                                    .future
                                     .map_err(Error::from)
                                     .and_then(H2Client::h2api_response)
-                                    .and_then(move |_result| {
-                                        future::ok(MergedChunkInfo::Known(list))
+                                    .and_then({
+                                        let uploaded = uploaded.clone();
+                                        let uploaded_compressed = uploaded_compressed.clone();
+                                        move |_result| {
+                                            // account for uploaded bytes for progress output
+                                            uploaded.fetch_add(response.size, Ordering::SeqCst);
+                                            uploaded_compressed.fetch_add(response.size_encoded, Ordering::SeqCst);
+                                            future::ok(MergedChunkInfo::Known(list))
+                                        }
                                     })
                             )
                         }
@@ -636,6 +654,7 @@ impl BackupWriter {
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
         injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
+        archive: &str,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let total_chunks = Arc::new(AtomicUsize::new(0));
         let total_chunks2 = total_chunks.clone();
@@ -646,25 +665,60 @@ impl BackupWriter {
 
         let stream_len = Arc::new(AtomicUsize::new(0));
         let stream_len2 = stream_len.clone();
+        let stream_len3 = stream_len.clone();
         let compressed_stream_len = Arc::new(AtomicU64::new(0));
         let compressed_stream_len2 = compressed_stream_len.clone();
         let reused_len = Arc::new(AtomicUsize::new(0));
         let reused_len2 = reused_len.clone();
+        let reused_len3 = reused_len.clone();
         let injected_len = Arc::new(AtomicUsize::new(0));
         let injected_len2 = injected_len.clone();
+        let uploaded_len = Arc::new(AtomicUsize::new(0));
+        let uploaded_len_compressed = Arc::new(AtomicUsize::new(0));
 
         let append_chunk_path = format!("{}_index", prefix);
         let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
-        let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+        let (upload_queue, upload_result) = Self::append_chunk_queue(
+            h2.clone(),
+            wid,
+            append_chunk_path,
+            uploaded_len.clone(),
+            uploaded_len_compressed.clone(),
+        );
 
         let start_time = std::time::Instant::now();
 
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
+        let archive = archive.to_string();
+        tokio::spawn(async move {
+            loop {
+                tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+                let size = stream_len3.load(Ordering::SeqCst);
+                let size_uploaded = uploaded_len.load(Ordering::SeqCst);
+                let size_reused = reused_len3.load(Ordering::SeqCst);
+                let size_compressed = uploaded_len_compressed.load(Ordering::SeqCst);
+                let size_new = size - size_reused;
+                let elapsed = start_time.elapsed();
+                let speed = size_uploaded * 1_000_000 / elapsed.as_micros() as usize;
+
+                log::info!(
+                    " {archive}: elapsed {:.2} s, new: {}, reused: {}, total: {}, uploaded: {} (compressed {}, average: {}/s)",
+                    elapsed.as_secs_f64(),
+                    HumanByte::from(size_new),
+                    HumanByte::from(size_reused),
+                    HumanByte::from(size),
+                    HumanByte::from(size_uploaded),
+                    HumanByte::from(size_compressed),
+                    HumanByte::from(speed),
+                );
+            }
+        });
+
         stream
             .inject_reused_chunks(injections, stream_len.clone())
             .and_then(move |chunk_info| match chunk_info {
@@ -753,6 +807,8 @@ impl BackupWriter {
                     );
 
                     let chunk_data = chunk_info.chunk.into_inner();
+                    let size = chunk_info.chunk_len as usize;
+                    let size_encoded = chunk_data.len();
                     let param = json!({
                         "wid": wid,
                         "digest": digest_str,
@@ -776,7 +832,14 @@ impl BackupWriter {
                     Either::Left(h2.send_request(request, upload_data).and_then(
                         move |response| async move {
                             upload_queue
-                                .send((new_info, Some(response)))
+                                .send((
+                                    new_info,
+                                    Some(ChunkUploadResponse {
+                                        future: response,
+                                        size,
+                                        size_encoded,
+                                    }),
+                                ))
                                 .await
                                 .map_err(|err| {
                                     format_err!("failed to send to upload queue: {}", err)
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


             reply	other threads:[~2024-09-25 12:23 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-09-25 12:22 Christian Ebner [this message]
2024-10-08 10:11 ` Gabriel Goller
2024-10-08 10:28   ` Christian Ebner
2024-10-09  9:22 ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240925122233.398359-1-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal