public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size
@ 2021-03-24 15:29 Dominik Csapak
  2021-03-24 15:44 ` Thomas Lamprecht
  0 siblings, 1 reply; 2+ messages in thread
From: Dominik Csapak @ 2021-03-24 15:29 UTC (permalink / raw)
  To: pbs-devel

The text 'had to upload [KMG]iB' implies that this is the size we
actually had to send to the server, while in reality it is the
raw data size before compression.

Count the size of the compressed chunks and print it separately.
Split the average speed into its own line so they do not get too long.

Introduce an 'UploadStats' struct instead of using a giant anonymous tuple
for the return values of 'upload_chunk_info_stream'.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
changes from v1:
* properly format log lines (archive in front, no dots at the end, etc.)
* introduce UploadStats struct and use that
* rustfmt relevant code
 src/client/backup_writer.rs | 141 ++++++++++++++++++++++++------------
 1 file changed, 96 insertions(+), 45 deletions(-)

diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
index cef7edef..3dcd7f57 100644
--- a/src/client/backup_writer.rs
+++ b/src/client/backup_writer.rs
@@ -1,6 +1,6 @@
 use std::collections::HashSet;
 use std::os::unix::fs::OpenOptionsExt;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
 use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
@@ -48,6 +48,16 @@ pub struct UploadOptions {
     pub fixed_size: Option<u64>,
 }
 
+struct UploadStats {
+    chunk_count: usize,
+    chunk_reused: usize,
+    size: usize,
+    size_reused: usize,
+    size_compressed: usize,
+    duration: std::time::Duration,
+    csum: [u8; 32],
+}
+
 type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
 type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
 
@@ -256,55 +266,81 @@ impl BackupWriter {
 
         let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
 
-        let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
-            Self::upload_chunk_info_stream(
-                self.h2.clone(),
-                wid,
-                stream,
-                &prefix,
-                known_chunks.clone(),
-                if options.encrypt { self.crypt_config.clone() } else { None },
-                options.compress,
-                self.verbose,
-            )
-            .await?;
-
-        let uploaded = size - size_reused;
-        let vsize_h: HumanByte = size.into();
+        let upload_stats = Self::upload_chunk_info_stream(
+            self.h2.clone(),
+            wid,
+            stream,
+            &prefix,
+            known_chunks.clone(),
+            if options.encrypt {
+                self.crypt_config.clone()
+            } else {
+                None
+            },
+            options.compress,
+            self.verbose,
+        )
+        .await?;
+
+        let size_dirty = upload_stats.size - upload_stats.size_reused;
+        let size: HumanByte = upload_stats.size.into();
         let archive = if self.verbose {
             archive_name.to_string()
         } else {
             crate::tools::format::strip_server_file_extension(archive_name)
         };
         if archive_name != CATALOG_NAME {
-            let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
-            let uploaded: HumanByte = uploaded.into();
-            println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
+            let speed: HumanByte = ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
+            let size_dirty: HumanByte = size_dirty.into();
+            let size_compressed: HumanByte = upload_stats.size_compressed.into();
+            println!(
+                "{}: had to backup {} of {} (compressed {}) in {:.2}s",
+                archive,
+                size_dirty,
+                size,
+                size_compressed,
+                upload_stats.duration.as_secs_f64()
+            );
+            println!("{}: average backup speed: {}/s", archive, speed);
         } else {
-            println!("Uploaded backup catalog ({})", vsize_h);
+            println!("Uploaded backup catalog ({})", size);
         }
 
-        if size_reused > 0 && size > 1024*1024 {
-            let reused_percent = size_reused as f64 * 100. / size as f64;
-            let reused: HumanByte = size_reused.into();
-            println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
+        if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
+            let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
+            let reused: HumanByte = upload_stats.size_reused.into();
+            println!(
+                "{}: backup was done incrementally, reused {} ({:.1}%)",
+                archive, reused, reused_percent
+            );
         }
-        if self.verbose && chunk_count > 0 {
-            println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
-            println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
-            println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
+        if self.verbose && upload_stats.chunk_count > 0 {
+            println!(
+                "{}: Reused {} from {} chunks.",
+                archive, upload_stats.chunk_reused, upload_stats.chunk_count
+            );
+            println!(
+                "{}: Average chunk size was {}.",
+                archive,
+                HumanByte::from(upload_stats.size / upload_stats.chunk_count)
+            );
+            println!(
+                "{}: Average time per request: {} microseconds.",
+                archive,
+                (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
+            );
         }
 
         let param = json!({
             "wid": wid ,
-            "chunk-count": chunk_count,
-            "size": size,
-            "csum": proxmox::tools::digest_to_hex(&csum),
+            "chunk-count": upload_stats.chunk_count,
+            "size": upload_stats.size,
+            "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
         });
         let _value = self.h2.post(&close_path, Some(param)).await?;
         Ok(BackupStats {
-            size: size as u64,
-            csum,
+            size: upload_stats.size as u64,
+            csum: upload_stats.csum,
         })
     }
 
@@ -521,7 +557,7 @@ impl BackupWriter {
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
         verbose: bool,
-    ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
+    ) -> impl Future<Output = Result<UploadStats, Error>> {
 
         let total_chunks = Arc::new(AtomicUsize::new(0));
         let total_chunks2 = total_chunks.clone();
@@ -530,6 +566,8 @@ impl BackupWriter {
 
         let stream_len = Arc::new(AtomicUsize::new(0));
         let stream_len2 = stream_len.clone();
+        let compressed_stream_len = Arc::new(AtomicU64::new(0));
+        let compressed_stream_len2 = compressed_stream_len.clone();
         let reused_len = Arc::new(AtomicUsize::new(0));
         let reused_len2 = reused_len.clone();
 
@@ -572,6 +610,7 @@ impl BackupWriter {
                 csum.update(digest);
 
                 let chunk_is_known = known_chunks.contains(digest);
+                let compressed_stream_len2 = compressed_stream_len.clone();
                 if chunk_is_known {
                     known_chunk_count.fetch_add(1, Ordering::SeqCst);
                     reused_len.fetch_add(chunk_len, Ordering::SeqCst);
@@ -580,12 +619,15 @@ impl BackupWriter {
                     known_chunks.insert(*digest);
                     future::ready(chunk_builder
                         .build()
-                        .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
-                            chunk,
-                            digest,
-                            chunk_len: chunk_len as u64,
-                            offset,
-                        }))
+                        .map(move |(chunk, digest)| {
+                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+                            MergedChunkInfo::New(ChunkInfo {
+                                chunk,
+                                digest,
+                                chunk_len: chunk_len as u64,
+                                offset,
+                            })
+                        })
                     )
                 }
             })
@@ -642,15 +684,24 @@ impl BackupWriter {
             }.boxed())
             .and_then(move |_| {
                 let duration = start_time.elapsed();
-                let total_chunks = total_chunks2.load(Ordering::SeqCst);
-                let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
-                let stream_len = stream_len2.load(Ordering::SeqCst);
-                let reused_len = reused_len2.load(Ordering::SeqCst);
+                let chunk_count = total_chunks2.load(Ordering::SeqCst);
+                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
+                let size = stream_len2.load(Ordering::SeqCst);
+                let size_reused = reused_len2.load(Ordering::SeqCst);
+                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
 
                 let mut guard = index_csum_2.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
-                futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
+                futures::future::ok(UploadStats{
+                    chunk_count,
+                    chunk_reused,
+                    size,
+                    size_reused,
+                    size_compressed,
+                    duration,
+                    csum,
+                })
             })
     }
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 2+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size
  2021-03-24 15:29 [pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size Dominik Csapak
@ 2021-03-24 15:44 ` Thomas Lamprecht
  0 siblings, 0 replies; 2+ messages in thread
From: Thomas Lamprecht @ 2021-03-24 15:44 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Dominik Csapak

On 24.03.21 16:29, Dominik Csapak wrote:
> The text 'had to upload [KMG]iB' implies that this is the size we
> actually had to send to the server, while in reality it is the
> raw data size before compression.
> 
> Count the size of the compressed chunks and print it separately.
> Split the average speed into its own line so they do not get too long.
> 
> Introduce an 'UploadStats' struct instead of using a giant anonymous tuple
> for the return values of 'upload_chunk_info_stream'.
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> changes from v1:
> * properly format log lines (archive in front, no dots at the end, etc.)
> * introduce UploadStats struct and use that
> * rustfmt relevant code


can we please stop the squashing multiple changes into one patch?

This needs to be at least two separate patches:

1. Non-semantic change to struct
2. clarify backup/upload size

rustfmt may be well its own patch too (first or last, I do not care to much
for that)

>  src/client/backup_writer.rs | 141 ++++++++++++++++++++++++------------
>  1 file changed, 96 insertions(+), 45 deletions(-)
> 
> diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
> index cef7edef..3dcd7f57 100644
> --- a/src/client/backup_writer.rs
> +++ b/src/client/backup_writer.rs
> @@ -1,6 +1,6 @@
>  use std::collections::HashSet;
>  use std::os::unix::fs::OpenOptionsExt;
> -use std::sync::atomic::{AtomicUsize, Ordering};
> +use std::sync::atomic::{AtomicUsize, AtomicU64, Ordering};
>  use std::sync::{Arc, Mutex};
>  
>  use anyhow::{bail, format_err, Error};
> @@ -48,6 +48,16 @@ pub struct UploadOptions {
>      pub fixed_size: Option<u64>,
>  }
>  
> +struct UploadStats {
> +    chunk_count: usize,
> +    chunk_reused: usize,
> +    size: usize,
> +    size_reused: usize,
> +    size_compressed: usize,
> +    duration: std::time::Duration,
> +    csum: [u8; 32],
> +}
> +
>  type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option<h2::client::ResponseFuture>)>;
>  type UploadResultReceiver = oneshot::Receiver<Result<(), Error>>;
>  
> @@ -256,55 +266,81 @@ impl BackupWriter {
>  
>          let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap();
>  
> -        let (chunk_count, chunk_reused, size, size_reused, duration, csum) =
> -            Self::upload_chunk_info_stream(
> -                self.h2.clone(),
> -                wid,
> -                stream,
> -                &prefix,
> -                known_chunks.clone(),
> -                if options.encrypt { self.crypt_config.clone() } else { None },
> -                options.compress,
> -                self.verbose,
> -            )
> -            .await?;
> -
> -        let uploaded = size - size_reused;
> -        let vsize_h: HumanByte = size.into();
> +        let upload_stats = Self::upload_chunk_info_stream(
> +            self.h2.clone(),
> +            wid,
> +            stream,
> +            &prefix,
> +            known_chunks.clone(),
> +            if options.encrypt {
> +                self.crypt_config.clone()
> +            } else {
> +                None
> +            },
> +            options.compress,
> +            self.verbose,
> +        )
> +        .await?;
> +
> +        let size_dirty = upload_stats.size - upload_stats.size_reused;
> +        let size: HumanByte = upload_stats.size.into();
>          let archive = if self.verbose {
>              archive_name.to_string()
>          } else {
>              crate::tools::format::strip_server_file_extension(archive_name)
>          };
>          if archive_name != CATALOG_NAME {
> -            let speed: HumanByte = ((uploaded * 1_000_000) / (duration.as_micros() as usize)).into();
> -            let uploaded: HumanByte = uploaded.into();
> -            println!("{}: had to upload {} of {} in {:.2}s, average speed {}/s).", archive, uploaded, vsize_h, duration.as_secs_f64(), speed);
> +            let speed: HumanByte = ((size_dirty * 1_000_000) / (upload_stats.duration.as_micros() as usize)).into();
> +            let size_dirty: HumanByte = size_dirty.into();
> +            let size_compressed: HumanByte = upload_stats.size_compressed.into();
> +            println!(
> +                "{}: had to backup {} of {} (compressed {}) in {:.2}s",
> +                archive,
> +                size_dirty,
> +                size,
> +                size_compressed,
> +                upload_stats.duration.as_secs_f64()
> +            );
> +            println!("{}: average backup speed: {}/s", archive, speed);
>          } else {
> -            println!("Uploaded backup catalog ({})", vsize_h);
> +            println!("Uploaded backup catalog ({})", size);
>          }
>  
> -        if size_reused > 0 && size > 1024*1024 {
> -            let reused_percent = size_reused as f64 * 100. / size as f64;
> -            let reused: HumanByte = size_reused.into();
> -            println!("{}: backup was done incrementally, reused {} ({:.1}%)", archive, reused, reused_percent);
> +        if upload_stats.size_reused > 0 && upload_stats.size > 1024 * 1024 {
> +            let reused_percent = upload_stats.size_reused as f64 * 100. / upload_stats.size as f64;
> +            let reused: HumanByte = upload_stats.size_reused.into();
> +            println!(
> +                "{}: backup was done incrementally, reused {} ({:.1}%)",
> +                archive, reused, reused_percent
> +            );
>          }
> -        if self.verbose && chunk_count > 0 {
> -            println!("{}: Reused {} from {} chunks.", archive, chunk_reused, chunk_count);
> -            println!("{}: Average chunk size was {}.", archive, HumanByte::from(size/chunk_count));
> -            println!("{}: Average time per request: {} microseconds.", archive, (duration.as_micros())/(chunk_count as u128));
> +        if self.verbose && upload_stats.chunk_count > 0 {
> +            println!(
> +                "{}: Reused {} from {} chunks.",
> +                archive, upload_stats.chunk_reused, upload_stats.chunk_count
> +            );
> +            println!(
> +                "{}: Average chunk size was {}.",
> +                archive,
> +                HumanByte::from(upload_stats.size / upload_stats.chunk_count)
> +            );
> +            println!(
> +                "{}: Average time per request: {} microseconds.",
> +                archive,
> +                (upload_stats.duration.as_micros()) / (upload_stats.chunk_count as u128)
> +            );
>          }
>  
>          let param = json!({
>              "wid": wid ,
> -            "chunk-count": chunk_count,
> -            "size": size,
> -            "csum": proxmox::tools::digest_to_hex(&csum),
> +            "chunk-count": upload_stats.chunk_count,
> +            "size": upload_stats.size,
> +            "csum": proxmox::tools::digest_to_hex(&upload_stats.csum),
>          });
>          let _value = self.h2.post(&close_path, Some(param)).await?;
>          Ok(BackupStats {
> -            size: size as u64,
> -            csum,
> +            size: upload_stats.size as u64,
> +            csum: upload_stats.csum,
>          })
>      }
>  
> @@ -521,7 +557,7 @@ impl BackupWriter {
>          crypt_config: Option<Arc<CryptConfig>>,
>          compress: bool,
>          verbose: bool,
> -    ) -> impl Future<Output = Result<(usize, usize, usize, usize, std::time::Duration, [u8; 32]), Error>> {
> +    ) -> impl Future<Output = Result<UploadStats, Error>> {
>  
>          let total_chunks = Arc::new(AtomicUsize::new(0));
>          let total_chunks2 = total_chunks.clone();
> @@ -530,6 +566,8 @@ impl BackupWriter {
>  
>          let stream_len = Arc::new(AtomicUsize::new(0));
>          let stream_len2 = stream_len.clone();
> +        let compressed_stream_len = Arc::new(AtomicU64::new(0));
> +        let compressed_stream_len2 = compressed_stream_len.clone();
>          let reused_len = Arc::new(AtomicUsize::new(0));
>          let reused_len2 = reused_len.clone();
>  
> @@ -572,6 +610,7 @@ impl BackupWriter {
>                  csum.update(digest);
>  
>                  let chunk_is_known = known_chunks.contains(digest);
> +                let compressed_stream_len2 = compressed_stream_len.clone();
>                  if chunk_is_known {
>                      known_chunk_count.fetch_add(1, Ordering::SeqCst);
>                      reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> @@ -580,12 +619,15 @@ impl BackupWriter {
>                      known_chunks.insert(*digest);
>                      future::ready(chunk_builder
>                          .build()
> -                        .map(move |(chunk, digest)| MergedChunkInfo::New(ChunkInfo {
> -                            chunk,
> -                            digest,
> -                            chunk_len: chunk_len as u64,
> -                            offset,
> -                        }))
> +                        .map(move |(chunk, digest)| {
> +                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +                            MergedChunkInfo::New(ChunkInfo {
> +                                chunk,
> +                                digest,
> +                                chunk_len: chunk_len as u64,
> +                                offset,
> +                            })
> +                        })
>                      )
>                  }
>              })
> @@ -642,15 +684,24 @@ impl BackupWriter {
>              }.boxed())
>              .and_then(move |_| {
>                  let duration = start_time.elapsed();
> -                let total_chunks = total_chunks2.load(Ordering::SeqCst);
> -                let known_chunk_count = known_chunk_count2.load(Ordering::SeqCst);
> -                let stream_len = stream_len2.load(Ordering::SeqCst);
> -                let reused_len = reused_len2.load(Ordering::SeqCst);
> +                let chunk_count = total_chunks2.load(Ordering::SeqCst);
> +                let chunk_reused = known_chunk_count2.load(Ordering::SeqCst);
> +                let size = stream_len2.load(Ordering::SeqCst);
> +                let size_reused = reused_len2.load(Ordering::SeqCst);
> +                let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>  
>                  let mut guard = index_csum_2.lock().unwrap();
>                  let csum = guard.take().unwrap().finish();
>  
> -                futures::future::ok((total_chunks, known_chunk_count, stream_len, reused_len, duration, csum))
> +                futures::future::ok(UploadStats{
> +                    chunk_count,
> +                    chunk_reused,
> +                    size,
> +                    size_reused,
> +                    size_compressed,
> +                    duration,
> +                    csum,
> +                })
>              })
>      }
>  
> 





^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2021-03-24 15:44 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-03-24 15:29 [pbs-devel] [PATCH proxmox-backup v2] client/backup_writer: clarify backup and upload size Dominik Csapak
2021-03-24 15:44 ` Thomas Lamprecht

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal