public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output
Date: Thu, 25 Jul 2024 12:19:22 +0200	[thread overview]
Message-ID: <20240725101922.231053-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>

In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 134 insertions(+), 31 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
     pub(crate) bytes: usize,
     pub(crate) elapsed: Duration,
     pub(crate) removed: Option<RemovedVanishedStats>,
+    pub(crate) log_buffer: Vec<String>,
 }
 
 impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
 }
 
 impl PullStats {
-    fn add(&mut self, rhs: PullStats) {
+    fn add(&mut self, mut rhs: PullStats) {
         self.chunk_count += rhs.chunk_count;
         self.bytes += rhs.bytes;
         self.elapsed += rhs.elapsed;
+        self.log_buffer.append(&mut rhs.log_buffer);
 
         if let Some(rhs_removed) = rhs.removed {
             if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
             if let Err(err) = std::fs::rename(&tmp_path, to_path) {
                 bail!("Atomic rename file {:?} failed - {}", to_path, err);
             }
-            info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
         }
 
         Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = bytes.load(Ordering::SeqCst);
     let chunk_count = chunk_count.load(Ordering::SeqCst);
 
-    info!(
+    let mut log_buffer = Vec::new();
+    let msg = format!(
         "downloaded {} ({}/s)",
         HumanByte::from(bytes),
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
+    log_info_buffer(msg, buffer_logs, &mut log_buffer);
 
     Ok(PullStats {
         chunk_count,
         bytes,
         elapsed,
         removed: None,
+        log_buffer,
     })
 }
 
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
 
     let mut pull_stats = PullStats::default();
 
-    info!("sync archive {archive_name}");
+    log_info_buffer(
+        format!("sync archive {archive_name}"),
+        buffer_logs,
+        &mut pull_stats.log_buffer,
+    );
 
     reader.load_file_into(archive_name, &tmp_path).await?;
 
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
             verify_archive(archive_info, &csum, size)?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_info_buffer(
+                    "skipping chunk sync for same datastore".to_string(),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             } else {
                 let stats = pull_index_chunks(
                     reader.chunk_reader(archive_info.crypt_mode),
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
+                    buffer_logs,
                 )
                 .await?;
                 pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let mut pull_stats = PullStats::default();
     let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
                 reader.try_download_client_log(&client_log_name).await?;
+                log_info_buffer(
+                    format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             };
-            info!("no data changes");
+            log_info_buffer(
+                "no data changes".to_string(),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(pull_stats); // nothing changed
         }
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
                     match manifest.verify_file(&item.filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_info_buffer(
+                                format!("detected changed file {path:?} - {err}"),
+                                buffer_logs,
+                                &mut pull_stats.log_buffer,
+                            );
                         }
                     }
                 }
             }
         }
 
-        let stats =
-            pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+        let stats = pull_single_archive(
+            reader.clone(),
+            snapshot,
+            item,
+            downloaded_chunks.clone(),
+            buffer_logs,
+        )
+        .await?;
         pull_stats.add(stats);
     }
 
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
 
     if !client_log_name.exists() {
         reader.try_download_client_log(&client_log_name).await?;
+        log_info_buffer(
+            format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+    buffer_logs: bool,
 ) -> Result<PullStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    let pull_stats = if is_new {
-        info!("sync snapshot {}", snapshot.dir());
+    let mut pull_stats = PullStats::default();
+    if is_new {
+        log_info_buffer(
+            format!("sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
 
-        match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+        match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
             Err(err) => {
                 if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
                     snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
                 }
                 return Err(err);
             }
-            Ok(pull_stats) => {
-                info!("sync snapshot {} done", snapshot.dir());
-                pull_stats
+            Ok(stats) => {
+                pull_stats.add(stats);
+                log_info_buffer(
+                    format!("sync snapshot {}", snapshot.dir()),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
             }
         }
     } else {
-        info!("re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(reader, snapshot, downloaded_chunks).await?
+        log_info_buffer(
+            format!("re-sync snapshot {}", snapshot.dir()),
+            buffer_logs,
+            &mut pull_stats.log_buffer,
+        );
+        let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+        pull_stats.add(stats);
     };
 
     Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
         .last_successful_backup(&target_ns, group)?
         .unwrap_or(i64::MIN);
 
+    let mut pull_stats = PullStats::default();
+    let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
     let list: Vec<BackupDir> = raw_list
         .into_iter()
         .enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
                 already_synced_skip_info.update(dir.time);
                 return false;
             } else if already_synced_skip_info.count > 0 {
-                info!("{already_synced_skip_info}");
+                log_info_buffer(
+                    format!("{already_synced_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 already_synced_skip_info.reset();
                 return true;
             }
@@ -1072,7 +1146,11 @@ async fn pull_group(
                 transfer_last_skip_info.update(dir.time);
                 return false;
             } else if transfer_last_skip_info.count > 0 {
-                info!("{transfer_last_skip_info}");
+                log_info_buffer(
+                    format!("{transfer_last_skip_info}"),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
+                );
                 transfer_last_skip_info.reset();
             }
             true
@@ -1088,8 +1166,6 @@ async fn pull_group(
         progress.group_snapshots = list.len() as u64;
     }
 
-    let mut pull_stats = PullStats::default();
-
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
             .target
@@ -1100,12 +1176,17 @@ async fn pull_group(
             .source
             .reader(source_namespace, &from_snapshot)
             .await?;
-        let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+        let result =
+            pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
 
         {
             let mut progress = progress.lock().unwrap();
             progress.done_snapshots = pos as u64 + 1;
-            info!("percentage done: {progress}");
+            log_info_buffer(
+                format!("percentage done: {progress}"),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
         }
 
         let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
                 continue;
             }
             if snapshot.is_protected() {
-                info!(
-                    "don't delete vanished snapshot {} (protected)",
-                    snapshot.dir()
+                log_info_buffer(
+                    format!(
+                        "don't delete vanished snapshot {} (protected)",
+                        snapshot.dir()
+                    ),
+                    buffer_logs,
+                    &mut pull_stats.log_buffer,
                 );
                 continue;
             }
-            info!("delete vanished snapshot {}", snapshot.dir());
+            log_info_buffer(
+                format!("delete vanished snapshot {}", snapshot.dir()),
+                buffer_logs,
+                &mut pull_stats.log_buffer,
+            );
             params
                 .target
                 .store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
     let mut pull_stats = PullStats::default();
     // poll to initiate tasks, queue another remaining tasks for each finished one
     while let Some(result) = pull_group_tasks.next().await {
-        let (progress, stats, has_errors) = result?;
+        let (progress, mut stats, has_errors) = result?;
         errors |= has_errors;
+        // Generate log output
+        for log_line in stats.log_buffer.iter() {
+            info!("{log_line}");
+        }
+        // clear log buffer before adding, don't need the logs anymore
+        stats.log_buffer.clear();
         pull_stats.add(stats);
         store_progress.done_groups += progress.done_groups;
         store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
 
     Ok((store_progress, pull_stats, errors))
 }
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+    if buffer_logs {
+        buffer.push(msg);
+    } else {
+        info!("{msg}");
+    }
+}
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


      parent reply	other threads:[~2024-07-25 10:19 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
2024-07-30 15:56   ` Gabriel Goller
2024-07-31  7:38     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54   ` Gabriel Goller
2024-07-31  7:35     ` Christian Ebner
2024-07-25 10:19 ` Christian Ebner [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240725101922.231053-5-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal