public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging
Date: Fri, 17 Apr 2026 11:26:20 +0200	[thread overview]
Message-ID: <20260417092621.455374-15-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>

Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.

Therefore, prefix push job log messages by the corresponding group or
snapshot and use the buffered logger implementation to buffer up to 5
lines subsequent lines with a timeout of 1 second. This reduces
interwoven log messages stemming from different groups.

Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- uses BufferedLogger implementation, refactored accordingly
- improve log line prefixes
- add missing error contexts

 src/server/push.rs | 245 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 199 insertions(+), 46 deletions(-)

diff --git a/src/server/push.rs b/src/server/push.rs
index 9b7fb4522..520bdd250 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -2,12 +2,13 @@
 
 use std::collections::HashSet;
 use std::sync::{Arc, Mutex};
+use std::time::Duration;
 
 use anyhow::{bail, format_err, Context, Error};
 use futures::stream::{self, StreamExt, TryStreamExt};
 use tokio::sync::mpsc;
 use tokio_stream::wrappers::ReceiverStream;
-use tracing::{info, warn};
+use tracing::{info, warn, Level};
 
 use pbs_api_types::{
     print_store_and_ns, ApiVersion, ApiVersionInfo, ArchiveType, Authid, BackupArchiveName,
@@ -28,6 +29,9 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{DataStore, StoreProgress};
 use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
+
+use proxmox_human_byte::HumanByte;
 
 use super::sync::{
     check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -564,6 +568,10 @@ pub(crate) async fn push_namespace(
     let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
     let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
 
+    let (buffered_logger, sender_builder) = BufferedLogger::new(5, Duration::from_secs(1));
+    // runs until sender_builder and all senders build from it are being dropped
+    buffered_logger.run_log_collection();
+
     let mut process_results = |results| {
         for result in results {
             match result {
@@ -571,7 +579,7 @@ pub(crate) async fn push_namespace(
                     stats.add(sync_stats);
                     progress.done_groups = shared_group_progress.increment_done();
                 }
-                Err(()) => errors = true,
+                Err(_err) => errors = true,
             }
         }
     };
@@ -582,17 +590,21 @@ pub(crate) async fn push_namespace(
         let not_owned_target_groups = Arc::clone(&not_owned_target_groups);
         let synced_groups = Arc::clone(&synced_groups);
         let group_progress_cloned = Arc::clone(&shared_group_progress);
+        let log_sender = Arc::new(sender_builder.sender_with_label(group.to_string()));
         let results = group_workers
             .spawn_task(async move {
-                push_group_do(
+                let result = push_group_do(
                     params,
                     &namespace,
                     &group,
                     group_progress_cloned,
                     synced_groups,
                     not_owned_target_groups,
+                    Arc::clone(&log_sender),
                 )
-                .await
+                .await;
+                let _ = log_sender.flush().await;
+                result
             })
             .await
             .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -694,23 +706,46 @@ async fn push_group_do(
     shared_group_progress: Arc<SharedGroupProgress>,
     synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
     not_owned_target_groups: Arc<HashSet<BackupGroup>>,
-) -> Result<SyncStats, ()> {
+    log_sender: Arc<LogLineSender>,
+) -> Result<SyncStats, Error> {
     if not_owned_target_groups.contains(group) {
-        warn!(
-            "Group '{group}' not owned by remote user '{}' on target, skipping upload",
-            params.target.remote_user(),
-        );
+        log_sender
+            .log(
+                Level::WARN,
+                format!(
+                    "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+                    params.target.remote_user(),
+                ),
+            )
+            .await?;
         shared_group_progress.increment_done();
         return Ok(SyncStats::default());
     }
 
     synced_groups.lock().unwrap().insert(group.clone());
-    push_group(params, namespace, group, Arc::clone(&shared_group_progress))
-        .await
-        .map_err(|err| {
-            warn!("Group {group}: Encountered errors: {err:#}");
-            warn!("Failed to push group {group} to remote!");
-        })
+    match push_group(
+        params,
+        namespace,
+        group,
+        Arc::clone(&shared_group_progress),
+        Arc::clone(&log_sender),
+    )
+    .await
+    {
+        Ok(res) => Ok(res),
+        Err(err) => {
+            log_sender
+                .log(Level::WARN, format!("Encountered errors: {err:#}"))
+                .await?;
+            log_sender
+                .log(
+                    Level::WARN,
+                    format!("Failed to push group {group} to remote!"),
+                )
+                .await?;
+            Err(err)
+        }
+    }
 }
 
 /// Push group including all snaphshots to target
@@ -727,6 +762,7 @@ pub(crate) async fn push_group(
     namespace: &BackupNamespace,
     group: &BackupGroup,
     shared_group_progress: Arc<SharedGroupProgress>,
+    log_sender: Arc<LogLineSender>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -738,7 +774,12 @@ pub(crate) async fn push_group(
     snapshots.sort_unstable_by_key(|a| a.backup.time);
 
     if snapshots.is_empty() {
-        info!("Group '{group}' contains no snapshots to sync to remote");
+        log_sender
+            .log(
+                Level::INFO,
+                format!("Group '{group}' contains no snapshots to sync to remote"),
+            )
+            .await?;
     }
 
     let target_namespace = params.map_to_target(namespace)?;
@@ -786,11 +827,15 @@ pub(crate) async fn push_group(
         .collect();
 
     if already_synced_skip_info.count > 0 {
-        info!("{already_synced_skip_info}");
+        log_sender
+            .log(Level::INFO, already_synced_skip_info.to_string())
+            .await?;
         already_synced_skip_info.reset();
     }
     if transfer_last_skip_info.count > 0 {
-        info!("{transfer_last_skip_info}");
+        log_sender
+            .log(Level::INFO, transfer_last_skip_info.to_string())
+            .await?;
         transfer_last_skip_info.reset();
     }
 
@@ -800,11 +845,18 @@ pub(crate) async fn push_group(
     let mut stats = SyncStats::default();
     let mut fetch_previous_manifest = !target_snapshots.is_empty();
     for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+        let prefix = proxmox_time::epoch_to_rfc3339_utc(source_snapshot.time)
+            .context("invalid timestamp")?;
+        log_sender
+            .log(Level::INFO, format!("{prefix}: start sync"))
+            .await?;
         let result = push_snapshot(
             &params,
             namespace,
             &source_snapshot,
             fetch_previous_manifest,
+            Arc::clone(&log_sender),
+            &prefix,
         )
         .await;
         fetch_previous_manifest = true;
@@ -812,10 +864,18 @@ pub(crate) async fn push_group(
         // Update done groups progress by other parallel running pushes
         local_progress.done_groups = shared_group_progress.load_done();
         local_progress.done_snapshots = pos as u64 + 1;
-        info!("Percentage done: group {group}: {local_progress}");
 
         // stop on error
         let sync_stats = result?;
+        log_sender
+            .log(Level::INFO, format!("{prefix}: sync done"))
+            .await?;
+        log_sender
+            .log(
+                Level::INFO,
+                format!("Percentage done: group {group}: {local_progress}"),
+            )
+            .await?;
         stats.add(sync_stats);
     }
 
@@ -825,25 +885,42 @@ pub(crate) async fn push_group(
                 continue;
             }
             if snapshot.protected {
-                info!(
-                    "Kept protected snapshot {name} on remote",
-                    name = snapshot.backup
-                );
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!(
+                            "Kept protected snapshot {name} on remote",
+                            name = snapshot.backup
+                        ),
+                    )
+                    .await?;
                 continue;
             }
             match forget_target_snapshot(&params, &target_namespace, &snapshot.backup).await {
                 Ok(()) => {
-                    info!(
-                        "Removed vanished snapshot {name} from remote",
-                        name = snapshot.backup
-                    );
+                    log_sender
+                        .log(
+                            Level::INFO,
+                            format!(
+                                "Removed vanished snapshot {name} from remote",
+                                name = snapshot.backup
+                            ),
+                        )
+                        .await?;
                 }
                 Err(err) => {
-                    warn!("Encountered errors: {err:#}");
-                    warn!(
-                        "Failed to remove vanished snapshot {name} from remote!",
-                        name = snapshot.backup
-                    );
+                    log_sender
+                        .log(Level::WARN, format!("Encountered errors: {err:#}"))
+                        .await?;
+                    log_sender
+                        .log(
+                            Level::WARN,
+                            format!(
+                                "Failed to remove vanished snapshot {name} from remote!",
+                                name = snapshot.backup
+                            ),
+                        )
+                        .await?;
                 }
             }
             stats.add(SyncStats::from(RemovedVanishedStats {
@@ -868,24 +945,40 @@ pub(crate) async fn push_snapshot(
     namespace: &BackupNamespace,
     snapshot: &BackupDir,
     fetch_previous_manifest: bool,
+    log_sender: Arc<LogLineSender>,
+    prefix: &String,
 ) -> Result<SyncStats, Error> {
     let mut stats = SyncStats::default();
-    let target_ns = params.map_to_target(namespace)?;
+    let target_ns = params
+        .map_to_target(namespace)
+        .with_context(|| prefix.clone())?;
     let backup_dir = params
         .source
         .store
-        .backup_dir(namespace.clone(), snapshot.clone())?;
+        .backup_dir(namespace.clone(), snapshot.clone())
+        .with_context(|| prefix.clone())?;
 
     // Reader locks the snapshot
-    let reader = params.source.reader(namespace, snapshot).await?;
+    let reader = params
+        .source
+        .reader(namespace, snapshot)
+        .await
+        .with_context(|| prefix.clone())?;
 
     // Does not lock the manifest, but the reader already assures a locked snapshot
     let source_manifest = match backup_dir.load_manifest() {
         Ok((manifest, _raw_size)) => manifest,
         Err(err) => {
             // No manifest in snapshot or failed to read, warn and skip
-            log::warn!("Encountered errors: {err:#}");
-            log::warn!("Failed to load manifest for '{snapshot}'!");
+            log_sender
+                .log(
+                    Level::WARN,
+                    format!("{prefix}: Encountered errors: {err:#}"),
+                )
+                .await?;
+            log_sender
+                .log(Level::WARN, format!("{prefix}: Failed to load manifest!"))
+                .await?;
             return Ok(stats);
         }
     };
@@ -912,14 +1005,22 @@ pub(crate) async fn push_snapshot(
             no_cache: false,
         },
     )
-    .await?;
+    .await
+    .with_context(|| prefix.clone())?;
 
     let mut previous_manifest = None;
     // Use manifest of previous snapshots in group on target for chunk upload deduplication
     if fetch_previous_manifest {
         match backup_writer.download_previous_manifest().await {
             Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
-            Err(err) => log::info!("Could not download previous manifest - {err}"),
+            Err(err) => {
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!("{prefix}: Could not download previous manifest - {err}"),
+                    )
+                    .await?
+            }
         }
     };
 
@@ -948,12 +1049,32 @@ pub(crate) async fn push_snapshot(
         path.push(&entry.filename);
         if path.try_exists()? {
             let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+            log_sender
+                .log(
+                    Level::INFO,
+                    format!("{prefix}: sync archive {archive_name}"),
+                )
+                .await?;
+            let archive_prefix = format!("{prefix}: {archive_name}");
             match archive_name.archive_type() {
                 ArchiveType::Blob => {
                     let file = std::fs::File::open(&path)?;
                     let backup_stats = backup_writer
                         .upload_blob(file, archive_name.as_ref())
                         .await?;
+                    log_sender
+                        .log(
+                            Level::INFO,
+                            format!(
+                                "{archive_prefix}: uploaded {} ({}/s)",
+                                HumanByte::from(backup_stats.size),
+                                HumanByte::new_binary(
+                                    backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+                                ),
+                            ),
+                        )
+                        .await
+                        .with_context(|| archive_prefix.clone())?;
                     stats.add(SyncStats {
                         chunk_count: backup_stats.chunk_count as usize,
                         bytes: backup_stats.size as usize,
@@ -972,7 +1093,7 @@ pub(crate) async fn push_snapshot(
                             )
                             .await;
                     }
-                    let index = DynamicIndexReader::open(&path)?;
+                    let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
                     let chunk_reader = reader
                         .chunk_reader(entry.chunk_crypt_mode())
                         .context("failed to get chunk reader")?;
@@ -984,7 +1105,20 @@ pub(crate) async fn push_snapshot(
                         IndexType::Dynamic,
                         known_chunks.clone(),
                     )
-                    .await?;
+                    .await
+                    .with_context(|| archive_prefix.clone())?;
+                    log_sender
+                        .log(
+                            Level::INFO,
+                            format!(
+                                "{archive_prefix}: uploaded {} ({}/s)",
+                                HumanByte::from(sync_stats.bytes),
+                                HumanByte::new_binary(
+                                    sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+                                ),
+                            ),
+                        )
+                        .await?;
                     stats.add(sync_stats);
                 }
                 ArchiveType::FixedIndex => {
@@ -1001,7 +1135,8 @@ pub(crate) async fn push_snapshot(
                     let index = FixedIndexReader::open(&path)?;
                     let chunk_reader = reader
                         .chunk_reader(entry.chunk_crypt_mode())
-                        .context("failed to get chunk reader")?;
+                        .context("failed to get chunk reader")
+                        .with_context(|| archive_prefix.clone())?;
                     let size = index.index_bytes();
                     let sync_stats = push_index(
                         &archive_name,
@@ -1011,7 +1146,20 @@ pub(crate) async fn push_snapshot(
                         IndexType::Fixed(Some(size)),
                         known_chunks.clone(),
                     )
-                    .await?;
+                    .await
+                    .with_context(|| archive_prefix.clone())?;
+                    log_sender
+                        .log(
+                            Level::INFO,
+                            format!(
+                                "{archive_prefix}: uploaded {} ({}/s)",
+                                HumanByte::from(sync_stats.bytes),
+                                HumanByte::new_binary(
+                                    sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+                                ),
+                            ),
+                        )
+                        .await?;
                     stats.add(sync_stats);
                 }
             }
@@ -1032,7 +1180,8 @@ pub(crate) async fn push_snapshot(
                 client_log_name.as_ref(),
                 upload_options.clone(),
             )
-            .await?;
+            .await
+            .with_context(|| prefix.clone())?;
     }
 
     // Rewrite manifest for pushed snapshot, recreating manifest from source on target
@@ -1044,8 +1193,12 @@ pub(crate) async fn push_snapshot(
             MANIFEST_BLOB_NAME.as_ref(),
             upload_options,
         )
-        .await?;
-    backup_writer.finish().await?;
+        .await
+        .with_context(|| prefix.clone())?;
+    backup_writer
+        .finish()
+        .await
+        .with_context(|| prefix.clone())?;
 
     stats.add(SyncStats {
         chunk_count: backup_stats.chunk_count as usize,
-- 
2.47.3





  parent reply	other threads:[~2026-04-17  9:27 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` Christian Ebner [this message]
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260417092621.455374-15-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal