public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v7 6/9] server: pull: prefix log messages and add error context
Date: Tue, 21 Apr 2026 12:26:51 +0200	[thread overview]
Message-ID: <20260421102654.610007-7-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260421102654.610007-1-c.ebner@proxmox.com>

Pulling groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.

Therefore, prefix pull job log messages by the corresponding group or
snapshot and set the error context accordingly.

Also, reword some messages, inline variables in format strings and
start log lines with capital letters to get consistent output.

By using the buffered logger implementation and buffer up to 5 lines
with a timeout of 1 second, subsequent log lines arriving in fast
succession are kept together, reducing the mixing of lines.

Example output for a sequential pull job:
```
...
[ct/100]: 2025-11-17T10:11:42Z: start sync
[ct/100]: 2025-11-17T10:11:42Z/pct.conf.blob: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: downloaded 16.785 MiB (373.063 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: downloaded 65.703 KiB (27.536 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z: sync done
[ct/100]: percentage done: 9.09% (1/11 groups)
[ct/101]: 2026-03-31T12:20:16Z: start sync
[ct/101]: 2026-03-31T12:20:16Z/pct.conf.blob: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: downloaded 199.806 MiB (346.31 MiB/s)
[ct/101]: 2026-03-31T12:20:16Z/catalog.pcat1.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/catalog.pcat1.didx: downloaded 180.379 KiB (26.354 MiB/s)
[ct/101]: 2026-03-31T12:20:16Z: sync done
...
```

Example output for a parallel pull job:
```
...
[ct/100]: 2025-11-17T10:11:42Z: start sync
[ct/101]: 2026-03-31T12:20:16Z: start sync
[ct/107]: 2025-07-16T09:14:01Z: start sync
[ct/100]: 2025-11-17T10:11:42Z/pct.conf.blob: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: sync archive
[ct/101]: 2026-03-31T12:20:16Z/pct.conf.blob: sync archive
[ct/101]: 2026-03-31T12:20:16Z/root.pxar.didx: sync archive
[ct/106]: 2025-11-17T10:20:32Z: start sync
[ct/106]: 2025-11-17T10:20:32Z/pct.conf.blob: sync archive
[ct/106]: 2025-11-17T10:20:32Z/root.pxar.didx: sync archive
[ct/107]: 2025-07-16T09:14:01Z/pct.conf.blob: sync archive
[ct/107]: 2025-07-16T09:14:01Z/root.ppxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.ppxar.didx: downloaded 16.785 MiB (12.032 MiB/s)
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: sync archive
[ct/100]: 2025-11-17T10:11:42Z/root.mpxar.didx: downloaded 65.703 KiB (1021.071 KiB/s)
[ct/100]: 2025-11-17T10:11:42Z: sync done
[ct/100]: snapshot 1/1 within ct/100 is done, 0/11 groups done
[ct/100]: group sync done: percentage done: 9.09% (1/11 groups)
...

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 347 ++++++++++++++++++++++++++++++++++-----------
 src/server/sync.rs |   7 +-
 2 files changed, 270 insertions(+), 84 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 714151918..cee7354f3 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet};
 use std::io::Seek;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Context, Error};
 use proxmox_human_byte::HumanByte;
-use tracing::{info, warn};
+use tracing::{info, Level};
 
 use pbs_api_types::{
     print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup,
@@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
 use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogSender, UnbufferedLogLineSender};
 use pbs_tools::sha::sha256;
 
 use super::sync::{
@@ -153,6 +154,8 @@ async fn pull_index_chunks<I: IndexFile>(
     index: I,
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     backend: &DatastoreBackend,
+    archive_prefix: &str,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -247,11 +250,16 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = bytes.load(Ordering::SeqCst);
     let chunk_count = chunk_count.load(Ordering::SeqCst);
 
-    info!(
-        "downloaded {} ({}/s)",
-        HumanByte::from(bytes),
-        HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
-    );
+    log_sender
+        .log(
+            Level::INFO,
+            format!(
+                "{archive_prefix}: downloaded {} ({}/s)",
+                HumanByte::from(bytes),
+                HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
+            ),
+        )
+        .await?;
 
     Ok(SyncStats {
         chunk_count,
@@ -292,6 +300,7 @@ async fn pull_single_archive<'a>(
     archive_info: &'a FileInfo,
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     backend: &DatastoreBackend,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
@@ -302,72 +311,104 @@ async fn pull_single_archive<'a>(
 
     let mut sync_stats = SyncStats::default();
 
-    info!("sync archive {archive_name}");
+    let archive_prefix = format!("{}/{archive_name}", snapshot.backup_time_string());
 
-    reader.load_file_into(archive_name, &tmp_path).await?;
+    log_sender
+        .log(Level::INFO, format!("{archive_prefix}: sync archive"))
+        .await?;
+
+    reader
+        .load_file_into(archive_name, &tmp_path)
+        .await
+        .with_context(|| archive_prefix.clone())?;
 
-    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
+    let mut tmpfile = std::fs::OpenOptions::new()
+        .read(true)
+        .open(&tmp_path)
+        .with_context(|| archive_prefix.clone())?;
 
     match ArchiveType::from_path(archive_name)? {
         ArchiveType::DynamicIndex => {
             let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
-                format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+                format_err!("{archive_prefix}: unable to read dynamic index {tmp_path:?} - {err}")
             })?;
             let (csum, size) = index.compute_csum();
-            verify_archive(archive_info, &csum, size)?;
+            verify_archive(archive_info, &csum, size).with_context(|| archive_prefix.clone())?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!("{archive_prefix}: skipping chunk sync for same datastore"),
+                    )
+                    .await?;
             } else {
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(archive_info.crypt_mode)
-                        .context("failed to get chunk reader")?,
+                        .context("failed to get chunk reader")
+                        .with_context(|| archive_prefix.clone())?,
                     snapshot.datastore().clone(),
                     index,
                     encountered_chunks,
                     backend,
+                    &archive_prefix,
+                    Arc::clone(&log_sender),
                 )
-                .await?;
+                .await
+                .with_context(|| archive_prefix.clone())?;
                 sync_stats.add(stats);
             }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
-                format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+                format_err!("{archive_name}: unable to read fixed index '{tmp_path:?}' - {err}")
             })?;
             let (csum, size) = index.compute_csum();
-            verify_archive(archive_info, &csum, size)?;
+            verify_archive(archive_info, &csum, size).with_context(|| archive_prefix.clone())?;
 
             if reader.skip_chunk_sync(snapshot.datastore().name()) {
-                info!("skipping chunk sync for same datastore");
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!("{archive_prefix}: skipping chunk sync for same datastore"),
+                    )
+                    .await?;
             } else {
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(archive_info.crypt_mode)
-                        .context("failed to get chunk reader")?,
+                        .context("failed to get chunk reader")
+                        .with_context(|| archive_prefix.clone())?,
                     snapshot.datastore().clone(),
                     index,
                     encountered_chunks,
                     backend,
+                    &archive_prefix,
+                    Arc::clone(&log_sender),
                 )
-                .await?;
+                .await
+                .with_context(|| archive_prefix.clone())?;
                 sync_stats.add(stats);
             }
         }
         ArchiveType::Blob => {
-            tmpfile.rewind()?;
-            let (csum, size) = sha256(&mut tmpfile)?;
-            verify_archive(archive_info, &csum, size)?;
+            proxmox_lang::try_block!({
+                tmpfile.rewind()?;
+                let (csum, size) = sha256(&mut tmpfile)?;
+                verify_archive(archive_info, &csum, size)
+            })
+            .with_context(|| archive_prefix.clone())?;
         }
     }
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
-        bail!("Atomic rename file {:?} failed - {}", path, err);
+        bail!("{archive_prefix}: Atomic rename file {path:?} failed - {err}");
     }
 
     backend
         .upload_index_to_backend(snapshot, archive_name)
-        .await?;
+        .await
+        .with_context(|| archive_prefix.clone())?;
 
     Ok(sync_stats)
 }
@@ -388,13 +429,24 @@ async fn pull_snapshot<'a>(
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     corrupt: bool,
     is_new: bool,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
+    let prefix = snapshot.backup_time_string().to_owned();
     if is_new {
-        info!("sync snapshot {}", snapshot.dir());
+        log_sender
+            .log(Level::INFO, format!("{prefix}: start sync"))
+            .await?;
     } else if corrupt {
-        info!("re-sync snapshot {} due to corruption", snapshot.dir());
+        log_sender
+            .log(
+                Level::INFO,
+                format!("re-sync snapshot {prefix} due to corruption"),
+            )
+            .await?;
     } else {
-        info!("re-sync snapshot {}", snapshot.dir());
+        log_sender
+            .log(Level::INFO, format!("re-sync snapshot {prefix}"))
+            .await?;
     }
 
     let mut sync_stats = SyncStats::default();
@@ -409,7 +461,8 @@ async fn pull_snapshot<'a>(
     let tmp_manifest_blob;
     if let Some(data) = reader
         .load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name)
-        .await?
+        .await
+        .with_context(|| prefix.clone())?
     {
         tmp_manifest_blob = data;
     } else {
@@ -419,28 +472,34 @@ async fn pull_snapshot<'a>(
     if manifest_name.exists() && !corrupt {
         let manifest_blob = proxmox_lang::try_block!({
             let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
-                format_err!("unable to open local manifest {manifest_name:?} - {err}")
+                format_err!("{prefix}: unable to open local manifest {manifest_name:?} - {err}")
             })?;
 
-            let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
+            let manifest_blob =
+                DataBlob::load_from_reader(&mut manifest_file).with_context(|| prefix.clone())?;
             Ok(manifest_blob)
         })
         .map_err(|err: Error| {
-            format_err!("unable to read local manifest {manifest_name:?} - {err}")
+            format_err!("{prefix}: unable to read local manifest {manifest_name:?} - {err}")
         })?;
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                reader.try_download_client_log(&client_log_name).await?;
+                reader
+                    .try_download_client_log(&client_log_name)
+                    .await
+                    .with_context(|| prefix.clone())?;
             };
-            info!("no data changes");
+            log_sender
+                .log(Level::INFO, format!("{prefix}: no data changes"))
+                .await?;
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(sync_stats); // nothing changed
         }
     }
 
     let manifest_data = tmp_manifest_blob.raw_data().to_vec();
-    let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
+    let manifest = BackupManifest::try_from(tmp_manifest_blob).with_context(|| prefix.clone())?;
 
     if ignore_not_verified_or_encrypted(
         &manifest,
@@ -464,35 +523,54 @@ async fn pull_snapshot<'a>(
         path.push(&item.filename);
 
         if !corrupt && path.exists() {
-            let filename: BackupArchiveName = item.filename.as_str().try_into()?;
+            let filename: BackupArchiveName = item
+                .filename
+                .as_str()
+                .try_into()
+                .with_context(|| prefix.clone())?;
             match filename.archive_type() {
                 ArchiveType::DynamicIndex => {
-                    let index = DynamicIndexReader::open(&path)?;
+                    let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
                     let (csum, size) = index.compute_csum();
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_sender
+                                .log(
+                                    Level::INFO,
+                                    format!("{prefix}: detected changed file {path:?} - {err}"),
+                                )
+                                .await?;
                         }
                     }
                 }
                 ArchiveType::FixedIndex => {
-                    let index = FixedIndexReader::open(&path)?;
+                    let index = FixedIndexReader::open(&path).with_context(|| prefix.clone())?;
                     let (csum, size) = index.compute_csum();
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_sender
+                                .log(
+                                    Level::INFO,
+                                    format!("{prefix}: detected changed file {path:?} - {err}"),
+                                )
+                                .await?;
                         }
                     }
                 }
                 ArchiveType::Blob => {
-                    let mut tmpfile = std::fs::File::open(&path)?;
-                    let (csum, size) = sha256(&mut tmpfile)?;
+                    let mut tmpfile = std::fs::File::open(&path).with_context(|| prefix.clone())?;
+                    let (csum, size) = sha256(&mut tmpfile).with_context(|| prefix.clone())?;
                     match manifest.verify_file(&filename, &csum, size) {
                         Ok(_) => continue,
                         Err(err) => {
-                            info!("detected changed file {path:?} - {err}");
+                            log_sender
+                                .log(
+                                    Level::INFO,
+                                    format!("{prefix}: detected changed file {path:?} - {err}"),
+                                )
+                                .await?;
                         }
                     }
                 }
@@ -505,13 +583,14 @@ async fn pull_snapshot<'a>(
             item,
             encountered_chunks.clone(),
             backend,
+            Arc::clone(&log_sender),
         )
         .await?;
         sync_stats.add(stats);
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
-        bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+        bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}");
     }
     if let DatastoreBackend::S3(s3_client) = backend {
         let object_key = pbs_datastore::s3::object_key_from_path(
@@ -524,33 +603,40 @@ async fn pull_snapshot<'a>(
         let _is_duplicate = s3_client
             .upload_replace_with_retry(object_key, data)
             .await
-            .context("failed to upload manifest to s3 backend")?;
+            .context("failed to upload manifest to s3 backend")
+            .with_context(|| prefix.clone())?;
     }
 
     if !client_log_name.exists() {
-        reader.try_download_client_log(&client_log_name).await?;
+        reader
+            .try_download_client_log(&client_log_name)
+            .await
+            .with_context(|| prefix.clone())?;
         if client_log_name.exists() {
             if let DatastoreBackend::S3(s3_client) = backend {
                 let object_key = pbs_datastore::s3::object_key_from_path(
                     &snapshot.relative_path(),
                     CLIENT_LOG_BLOB_NAME.as_ref(),
                 )
-                .context("invalid archive object key")?;
+                .context("invalid archive object key")
+                .with_context(|| prefix.clone())?;
 
                 let data = tokio::fs::read(&client_log_name)
                     .await
-                    .context("failed to read log file contents")?;
+                    .context("failed to read log file contents")
+                    .with_context(|| prefix.clone())?;
                 let contents = hyper::body::Bytes::from(data);
                 let _is_duplicate = s3_client
                     .upload_replace_with_retry(object_key, contents)
                     .await
-                    .context("failed to upload client log to s3 backend")?;
+                    .context("failed to upload client log to s3 backend")
+                    .with_context(|| prefix.clone())?;
             }
         }
     };
     snapshot
         .cleanup_unreferenced_files(&manifest)
-        .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
+        .map_err(|err| format_err!("{prefix}: failed to cleanup unreferenced files - {err}"))?;
 
     Ok(sync_stats)
 }
@@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     corrupt: bool,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
+    let prefix = snapshot.backup_time_string().to_string();
+
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
-        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())
+        .context(prefix.clone())?;
 
     let result = pull_snapshot(
         params,
@@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>(
         encountered_chunks,
         corrupt,
         is_new,
+        Arc::clone(&log_sender),
     )
     .await;
 
@@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>(
                     snapshot.as_ref(),
                     true,
                 ) {
-                    info!("cleanup error - {cleanup_err}");
+                    log_sender
+                        .log(
+                            Level::INFO,
+                            format!("{prefix}: cleanup error - {cleanup_err}"),
+                        )
+                        .await?;
                 }
                 return Err(err);
             }
-            Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
+            Ok(_) => {
+                log_sender
+                    .log(Level::INFO, format!("{prefix}: sync done"))
+                    .await?
+            }
         }
     }
 
@@ -622,7 +722,9 @@ async fn pull_group(
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
     shared_group_progress: Arc<SharedGroupProgress>,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
+    let prefix = format!("{group}");
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
@@ -714,11 +816,15 @@ async fn pull_group(
         .collect();
 
     if already_synced_skip_info.count > 0 {
-        info!("{already_synced_skip_info}");
+        log_sender
+            .log(Level::INFO, format!("{prefix}: {already_synced_skip_info}"))
+            .await?;
         already_synced_skip_info.reset();
     }
     if transfer_last_skip_info.count > 0 {
-        info!("{transfer_last_skip_info}");
+        log_sender
+            .log(Level::INFO, format!("{prefix}: {transfer_last_skip_info}"))
+            .await?;
         transfer_last_skip_info.reset();
     }
 
@@ -730,8 +836,8 @@ async fn pull_group(
         .store
         .backup_group(target_ns.clone(), group.clone());
     if let Some(info) = backup_group.last_backup(true).unwrap_or(None) {
-        let mut reusable_chunks = encountered_chunks.lock().unwrap();
         if let Err(err) = proxmox_lang::try_block!({
+            let mut reusable_chunks = encountered_chunks.lock().unwrap();
             let _snapshot_guard = info
                 .backup_dir
                 .lock_shared()
@@ -780,7 +886,12 @@ async fn pull_group(
             }
             Ok::<(), Error>(())
         }) {
-            warn!("Failed to collect reusable chunk from last backup: {err:#?}");
+            log_sender
+                .log(
+                    Level::WARN,
+                    format!("Failed to collect reusable chunk from last backup: {err:#?}"),
+                )
+                .await?;
         }
     }
 
@@ -805,13 +916,31 @@ async fn pull_group(
             &to_snapshot,
             encountered_chunks.clone(),
             corrupt,
+            Arc::clone(&log_sender),
         )
         .await;
 
         // Update done groups progress by other parallel running pulls
         local_progress.done_groups = shared_group_progress.load_done();
         local_progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: group {group}: {local_progress}");
+        if params.worker_threads.unwrap_or(1) == 1 {
+            log_sender
+                .log(Level::INFO, format!("percentage done: {local_progress}"))
+                .await?;
+        } else {
+            log_sender
+                .log(
+                    Level::INFO,
+                    format!(
+                        "snapshot {}/{} within {group} is done, {}/{} groups done",
+                        local_progress.done_snapshots,
+                        local_progress.group_snapshots,
+                        local_progress.done_groups,
+                        local_progress.total_groups,
+                    ),
+                )
+                .await?;
+        }
 
         let stats = result?; // stop on error
         sync_stats.add(stats);
@@ -829,13 +958,23 @@ async fn pull_group(
                 continue;
             }
             if snapshot.is_protected() {
-                info!(
-                    "don't delete vanished snapshot {} (protected)",
-                    snapshot.dir()
-                );
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!(
+                            "{prefix}: don't delete vanished snapshot {} (protected)",
+                            snapshot.dir(),
+                        ),
+                    )
+                    .await?;
                 continue;
             }
-            info!("delete vanished snapshot {}", snapshot.dir());
+            log_sender
+                .log(
+                    Level::INFO,
+                    format!("delete vanished snapshot {}", snapshot.dir()),
+                )
+                .await?;
             params
                 .target
                 .store
@@ -850,6 +989,15 @@ async fn pull_group(
 
     shared_group_progress.increment_done();
 
+    if params.worker_threads.unwrap_or(1) > 1 {
+        log_sender
+            .log(
+                Level::INFO,
+                format!("group sync done: percentage done: {local_progress}"),
+            )
+            .await?;
+    }
+
     Ok(sync_stats)
 }
 
@@ -1037,10 +1185,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
             }
             Err(err) => {
                 errors = true;
-                info!(
-                    "Encountered errors while syncing namespace {} - {err}",
-                    &namespace,
-                );
+                info!("Encountered errors while syncing namespace {namespace} - {err}");
             }
         };
     }
@@ -1066,6 +1211,7 @@ async fn lock_and_pull_group(
     namespace: &BackupNamespace,
     target_namespace: &BackupNamespace,
     shared_group_progress: Arc<SharedGroupProgress>,
+    log_sender: Arc<dyn LogSender>,
 ) -> Result<SyncStats, Error> {
     let (owner, _lock_guard) =
         match params
@@ -1075,25 +1221,47 @@ async fn lock_and_pull_group(
         {
             Ok(res) => res,
             Err(err) => {
-                info!("sync group {group} failed - group lock failed: {err}");
-                info!("create_locked_backup_group failed");
+                log_sender
+                    .log(
+                        Level::INFO,
+                        format!("sync group {group} failed - group lock failed: {err}"),
+                    )
+                    .await?;
+                log_sender
+                    .log(Level::INFO, "create_locked_backup_group failed".to_string())
+                    .await?;
                 return Err(err);
             }
         };
 
     if params.owner != owner {
         // only the owner is allowed to create additional snapshots
-        info!(
-            "sync group {group} failed - owner check failed ({} != {owner})",
-            params.owner
-        );
+        log_sender
+            .log(
+                Level::INFO,
+                format!(
+                    "sync group {group} failed - owner check failed ({} != {owner})",
+                    params.owner,
+                ),
+            )
+            .await?;
         return Err(format_err!("owner check failed"));
     }
 
-    match pull_group(params, namespace, group, shared_group_progress).await {
+    match pull_group(
+        params,
+        namespace,
+        group,
+        shared_group_progress,
+        Arc::clone(&log_sender),
+    )
+    .await
+    {
         Ok(stats) => Ok(stats),
         Err(err) => {
-            info!("sync group {group} failed - {err:#}");
+            log_sender
+                .log(Level::INFO, format!("sync group {group} failed - {err:#}"))
+                .await?;
             Err(err)
         }
     }
@@ -1126,7 +1294,7 @@ async fn pull_ns(
     list.sort_unstable();
 
     info!(
-        "found {} groups to sync (out of {unfiltered_count} total)",
+        "Found {} groups to sync (out of {unfiltered_count} total)",
         list.len()
     );
 
@@ -1145,6 +1313,15 @@ async fn pull_ns(
     let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
     let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
 
+    let sender_builder = if params.worker_threads.unwrap_or(1) == 1 {
+        let (buffered_logger, sender_builder) = BufferedLogger::new(5, Duration::from_secs(1));
+        // runs until sender_builder and all senders build from it are being dropped
+        buffered_logger.run_log_collection();
+        Some(sender_builder)
+    } else {
+        None
+    };
+
     let mut process_results = |results| {
         for result in results {
             match result {
@@ -1162,16 +1339,24 @@ async fn pull_ns(
         let target_ns = target_ns.clone();
         let params = Arc::clone(&params);
         let group_progress_cloned = Arc::clone(&shared_group_progress);
+        let log_sender: Arc<dyn LogSender> = if let Some(sender_builder) = &sender_builder {
+            Arc::new(sender_builder.sender_with_label(group.to_string()))
+        } else {
+            Arc::new(UnbufferedLogLineSender::new(group.to_string()))
+        };
         let results = group_workers
             .spawn_task(async move {
-                lock_and_pull_group(
+                let result = lock_and_pull_group(
                     Arc::clone(&params),
                     &group,
                     &namespace,
                     &target_ns,
                     group_progress_cloned,
+                    Arc::clone(&log_sender),
                 )
-                .await
+                .await;
+                let _ = log_sender.flush().await;
+                result
             })
             .await
             .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -1199,7 +1384,7 @@ async fn pull_ns(
                 if !local_group.apply_filters(&params.group_filter) {
                     continue;
                 }
-                info!("delete vanished group '{local_group}'");
+                info!("Delete vanished group '{local_group}'");
                 let delete_stats_result = params
                     .target
                     .store
@@ -1208,7 +1393,7 @@ async fn pull_ns(
                 match delete_stats_result {
                     Ok(stats) => {
                         if !stats.all_removed() {
-                            info!("kept some protected snapshots of group '{local_group}'");
+                            info!("Kept some protected snapshots of group '{local_group}'");
                             sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 0,
@@ -1231,7 +1416,7 @@ async fn pull_ns(
             Ok(())
         });
         if let Err(err) = result {
-            info!("error during cleanup: {err}");
+            info!("Error during cleanup: {err}");
             errors = true;
         };
     }
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 78c232bf9..17ed4839f 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -135,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader {
                 Some(HttpError { code, message }) => match *code {
                     StatusCode::NOT_FOUND => {
                         info!(
-                            "skipping snapshot {} - vanished since start of sync",
+                            "Snapshot {}: skipped because vanished since start of sync",
                             &self.dir
                         );
                         return Ok(None);
                     }
                     _ => {
-                        bail!("HTTP error {code} - {message}");
+                        bail!("Snapshot {}: HTTP error {code} - {message}", &self.dir);
                     }
                 },
                 None => {
@@ -175,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader {
                 bail!("Atomic rename file {to_path:?} failed - {err}");
             }
             info!(
-                "got backup log file {client_log_name}",
+                "Snapshot {snapshot}: got backup log file {client_log_name}",
+                snapshot = &self.dir,
                 client_log_name = client_log_name.deref()
             );
         }
-- 
2.47.3





  parent reply	other threads:[~2026-04-21 10:27 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-21 10:26 [PATCH proxmox{,-backup} v7 0/9] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox v7 1/9] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-21 22:18   ` applied: " Thomas Lamprecht
2026-04-21 10:26 ` [PATCH proxmox-backup v7 2/9] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 3/9] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 4/9] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 5/9] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-21 10:26 ` Christian Ebner [this message]
2026-04-21 10:26 ` [PATCH proxmox-backup v7 7/9] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 8/9] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-21 10:26 ` [PATCH proxmox-backup v7 9/9] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260421102654.610007-7-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal