public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: Re: [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context
Date: Tue, 21 Apr 2026 09:42:23 +0200	[thread overview]
Message-ID: <7f8852e8-c092-4e2d-a031-192c087fb364@proxmox.com> (raw)
In-Reply-To: <295ee353-2731-474b-aad0-d1f8a2bcb443@proxmox.com>

On 4/21/26 9:21 AM, Christian Ebner wrote:
> On 4/20/26 1:54 PM, Fabian Grünbichler wrote:
>> On April 17, 2026 11:26 am, Christian Ebner wrote:
>>> Pulling groups and therefore also snapshots in parallel leads to
>>> unordered log outputs, making it mostly impossible to relate a log
>>> message to a backup snapshot/group.
>>>
>>> Therefore, prefix pull job log messages by the corresponding group or
>>> snapshot and set the error context accordingly.
>>>
>>> Also, reword some messages, inline variables in format strings and
>>> start log lines with capital letters to get consistent output.
>>>
>>> By using the buffered logger implementation and buffer up to 5 lines
>>> with a timeout of 1 second, subsequent log lines arriving in fast
>>> succession are kept together, reducing the mixing of lines.
>>>
>>> Example output for a sequential pull job:
>>> ```
>>> ...
>>> [ct/100]: 2025-11-17T10:11:42Z: start sync
>>> [ct/100]: 2025-11-17T10:11:42Z: pct.conf.blob: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: downloaded 16.785 
>>> MiB (280.791 MiB/s)
>>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: downloaded 65.703 
>>> KiB (29.1 MiB/s)
>>> [ct/100]: 2025-11-17T10:11:42Z: sync done
>>> [ct/100]: percentage done: 9.09% (1/11 groups)
>>> [ct/101]: 2026-03-31T12:20:16Z: start sync
>>> [ct/101]: 2026-03-31T12:20:16Z: pct.conf.blob: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: downloaded 199.806 
>>> MiB (311.91 MiB/s)
>>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: downloaded 
>>> 180.379 KiB (22.748 MiB/s)
>>> [ct/101]: 2026-03-31T12:20:16Z: sync done
>>
>> this is probably the wrong patch for this comment, but since you have
>> the sample output here ;)
>>
>> should we pad the labels? the buffered logger knows all currently active
>> labels and could adapt it? otherwise especially for host backups the
>> logs are again not really scannable by humans, because there will be
>> weird jumps in alignment.. or (.. continued below)
> 
> That will not work though? While the logger knows about the labels when 
> receiving them, it does not know at all about future ones that might 
> still come in. So it can happen that the padding does not fit anymore, 
> and one gets even uglier formatting issues.
> 
> I would prefer to keep them as is for the time being. For vm/ct it is 
> not that bad, one could maybe forsee the ID to be a 4 digit number and 
> define a minimum label lenght to be padded if not reached.
> 
> Host backups or backups with explicit label
> 
> 
>>> ...
>>> ```
>>>
>>> Example output for a parallel pull job:
>>> ```
>>> ...
>>> [ct/107]: 2025-07-16T09:14:01Z: start sync
>>> [ct/107]: 2025-07-16T09:14:01Z: pct.conf.blob: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: sync archive
>>> [vm/108]: 2025-09-19T07:37:19Z: start sync
>>> [vm/108]: 2025-09-19T07:37:19Z: qemu-server.conf.blob: sync archive
>>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: downloaded 609.233 
>>> MiB (112.628 MiB/s)
>>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: downloaded 1.172 MiB 
>>> (17.838 MiB/s)
>>> [ct/107]: 2025-07-16T09:14:01Z: sync done
>>> [ct/107]: percentage done: 72.73% (8/11 groups)
>>
>> the way the prefix and snapshot are formatted could also be interpreted
>> at first glance as a timestamp of the log line.. why not just prepend
>> the prefix on the logger side, and leave it up to the caller to do the
>> formatting? then we could us "{type}/{id}/" as prefix here? or add
>> 'snapshot' in those lines to make it clear? granted, this is more of an
>> issue when viewing the log via `proxmox-backup-manager`, as in the UI we
>> have the log timestamps up front..
> 
> I simply followed along the line of your suggestion here. In prior 
> versions I had exactly that but you rejected it as to verbose? :)
> 
> https://lore.proxmox.com/pbs-devel/1774263381.bngcrer2th.astroid@yuna.none/
> 
>>
>> and maybe(?) log the progress line using a different prefix? because
>> right now the information that the group [ct/107] is finished is not
>> really clear from the output, IMHO.
> 
> That I can add, yes.
> 
>>
>> the progress logging is also still broken (this is for a sync that takes
>> a while, this is not log messages being buffered and re-ordered!):
>>
>> $ proxmox-backup-manager task log 
>> 'UPID:yuna:00070656:001E420F:00000002:69E610EB:syncjob:local\x3atest\x3atank\x3a\x3as\x2dbc01cba6\x2d805a:root@pam:' | grep -e namespace -e 'percentage done'
>> Syncing datastore 'test', root namespace into datastore 'tank', root 
>> namespace
>> Finished syncing root namespace, current progress: 0 groups, 0 snapshots
>> Syncing datastore 'test', namespace 'test' into datastore 'tank', 
>> namespace 'test'
>> [host/exclusion-test]: percentage done: 5.26% (1/19 groups)
>> [host/acltest]: percentage done: 5.26% (1/19 groups)
>> [host/logtest]: percentage done: 5.26% (1/19 groups)
>> [host/onemeg]: percentage done: 5.26% (1/19 groups)
>> [host/fourmeg]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in 
>> group #1)
>> [host/symlink]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in 
>> group #1)
>> [host/symlink]: percentage done: 5.26% (1/19 groups)
>> [host/fourmeg]: percentage done: 5.26% (1/19 groups)
>> [host/format-v2-test]: percentage done: 1.75% (0/19 groups, 1/3 
>> snapshots in group #1)
>> [host/format-v2-test]: percentage done: 3.51% (0/19 groups, 2/3 
>> snapshots in group #1)
>> [host/format-v2-test]: percentage done: 5.26% (1/19 groups)
>> [host/incrementaltest2]: percentage done: 2.63% (0/19 groups, 1/2 
>> snapshots in group #1)
>> [host/incrementaltest2]: percentage done: 5.26% (1/19 groups)
> 
> There will always be cases where it does not work I guess, not sure what 
> I can do to make you happy here.

Oh, sorry, might have been to quick on this one. You are saying this 
happens for a non-parallel sync?

>>
>>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: downloaded 
>>> 1.196 GiB (156.892 MiB/s)
>>> [vm/108]: 2025-09-19T07:37:19Z: sync done
>>> ...
>>>
>>> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>>> ---
>>> changes since version 5:
>>> - uses BufferedLogger implementation, refactored accordingly
>>> - improve log line prefixes
>>> - add missing error contexts
>>>
>>>   src/server/pull.rs | 314 +++++++++++++++++++++++++++++++++------------
>>>   src/server/sync.rs |   8 +-
>>>   2 files changed, 237 insertions(+), 85 deletions(-)
>>>
>>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>>> index 611441d2a..f7aae4d59 100644
>>> --- a/src/server/pull.rs
>>> +++ b/src/server/pull.rs
>>> @@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet};
>>>   use std::io::Seek;
>>>   use std::sync::atomic::{AtomicUsize, Ordering};
>>>   use std::sync::{Arc, Mutex};
>>> -use std::time::SystemTime;
>>> +use std::time::{Duration, SystemTime};
>>>   use anyhow::{bail, format_err, Context, Error};
>>>   use proxmox_human_byte::HumanByte;
>>> -use tracing::{info, warn};
>>> +use tracing::{info, Level};
>>>   use pbs_api_types::{
>>>       print_store_and_ns, ArchiveType, Authid, BackupArchiveName, 
>>> BackupDir, BackupGroup,
>>> @@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest, 
>>> FileInfo};
>>>   use pbs_datastore::read_chunk::AsyncReadChunk;
>>>   use pbs_datastore::{check_backup_owner, DataStore, 
>>> DatastoreBackend, StoreProgress};
>>>   use pbs_tools::bounded_join_set::BoundedJoinSet;
>>> +use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
>>>   use pbs_tools::sha::sha256;
>>>   use super::sync::{
>>> @@ -153,6 +154,8 @@ async fn pull_index_chunks<I: IndexFile>(
>>>       index: I,
>>>       encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>>       backend: &DatastoreBackend,
>>> +    archive_prefix: &str,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>>       use futures::stream::{self, StreamExt, TryStreamExt};
>>> @@ -247,11 +250,16 @@ async fn pull_index_chunks<I: IndexFile>(
>>>       let bytes = bytes.load(Ordering::SeqCst);
>>>       let chunk_count = chunk_count.load(Ordering::SeqCst);
>>> -    info!(
>>> -        "downloaded {} ({}/s)",
>>> -        HumanByte::from(bytes),
>>> -        HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
>>> -    );
>>> +    log_sender
>>> +        .log(
>>> +            Level::INFO,
>>> +            format!(
>>> +                "{archive_prefix}: downloaded {} ({}/s)",
>>> +                HumanByte::from(bytes),
>>> +                HumanByte::new_binary(bytes as f64 / 
>>> elapsed.as_secs_f64()),
>>> +            ),
>>> +        )
>>> +        .await?;
>>>       Ok(SyncStats {
>>>           chunk_count,
>>> @@ -292,6 +300,7 @@ async fn pull_single_archive<'a>(
>>>       archive_info: &'a FileInfo,
>>>       encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>>       backend: &DatastoreBackend,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>>       let archive_name = &archive_info.filename;
>>>       let mut path = snapshot.full_path();
>>> @@ -302,72 +311,104 @@ async fn pull_single_archive<'a>(
>>>       let mut sync_stats = SyncStats::default();
>>> -    info!("sync archive {archive_name}");
>>> +    let archive_prefix = format!("{}: {archive_name}", 
>>> snapshot.backup_time_string());
>>> +
>>> +    log_sender
>>> +        .log(Level::INFO, format!("{archive_prefix}: sync archive"))
>>> +        .await?;
>>> -    reader.load_file_into(archive_name, &tmp_path).await?;
>>> +    reader
>>> +        .load_file_into(archive_name, &tmp_path)
>>> +        .await
>>> +        .with_context(|| archive_prefix.clone())?;
>>> -    let mut tmpfile = 
>>> std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>>> +    let mut tmpfile = std::fs::OpenOptions::new()
>>> +        .read(true)
>>> +        .open(&tmp_path)
>>> +        .with_context(|| archive_prefix.clone())?;
>>>       match ArchiveType::from_path(archive_name)? {
>>>           ArchiveType::DynamicIndex => {
>>>               let index = DynamicIndexReader::new(tmpfile).map_err(| 
>>> err| {
>>> -                format_err!("unable to read dynamic index {:?} - 
>>> {}", tmp_path, err)
>>> +                format_err!("{archive_prefix}: unable to read 
>>> dynamic index {tmp_path:?} - {err}")
>>>               })?;
>>>               let (csum, size) = index.compute_csum();
>>> -            verify_archive(archive_info, &csum, size)?;
>>> +            verify_archive(archive_info, &csum, 
>>> size).with_context(|| archive_prefix.clone())?;
>>>               if reader.skip_chunk_sync(snapshot.datastore().name()) {
>>> -                info!("skipping chunk sync for same datastore");
>>> +                log_sender
>>> +                    .log(
>>> +                        Level::INFO,
>>> +                        format!("{archive_prefix}: skipping chunk 
>>> sync for same datastore"),
>>> +                    )
>>> +                    .await?;
>>>               } else {
>>>                   let stats = pull_index_chunks(
>>>                       reader
>>>                           .chunk_reader(archive_info.crypt_mode)
>>> -                        .context("failed to get chunk reader")?,
>>> +                        .context("failed to get chunk reader")
>>> +                        .with_context(|| archive_prefix.clone())?,
>>>                       snapshot.datastore().clone(),
>>>                       index,
>>>                       encountered_chunks,
>>>                       backend,
>>> +                    &archive_prefix,
>>> +                    Arc::clone(&log_sender),
>>>                   )
>>> -                .await?;
>>> +                .await
>>> +                .with_context(|| archive_prefix.clone())?;
>>>                   sync_stats.add(stats);
>>>               }
>>>           }
>>>           ArchiveType::FixedIndex => {
>>>               let index = FixedIndexReader::new(tmpfile).map_err(|err| {
>>> -                format_err!("unable to read fixed index '{:?}' - 
>>> {}", tmp_path, err)
>>> +                format_err!("{archive_name}: unable to read fixed 
>>> index '{tmp_path:?}' - {err}")
>>>               })?;
>>>               let (csum, size) = index.compute_csum();
>>> -            verify_archive(archive_info, &csum, size)?;
>>> +            verify_archive(archive_info, &csum, 
>>> size).with_context(|| archive_prefix.clone())?;
>>>               if reader.skip_chunk_sync(snapshot.datastore().name()) {
>>> -                info!("skipping chunk sync for same datastore");
>>> +                log_sender
>>> +                    .log(
>>> +                        Level::INFO,
>>> +                        format!("{archive_prefix}: skipping chunk 
>>> sync for same datastore"),
>>> +                    )
>>> +                    .await?;
>>>               } else {
>>>                   let stats = pull_index_chunks(
>>>                       reader
>>>                           .chunk_reader(archive_info.crypt_mode)
>>> -                        .context("failed to get chunk reader")?,
>>> +                        .context("failed to get chunk reader")
>>> +                        .with_context(|| archive_prefix.clone())?,
>>>                       snapshot.datastore().clone(),
>>>                       index,
>>>                       encountered_chunks,
>>>                       backend,
>>> +                    &archive_prefix,
>>> +                    Arc::clone(&log_sender),
>>>                   )
>>> -                .await?;
>>> +                .await
>>> +                .with_context(|| archive_prefix.clone())?;
>>>                   sync_stats.add(stats);
>>>               }
>>>           }
>>>           ArchiveType::Blob => {
>>> -            tmpfile.rewind()?;
>>> -            let (csum, size) = sha256(&mut tmpfile)?;
>>> -            verify_archive(archive_info, &csum, size)?;
>>> +            proxmox_lang::try_block!({
>>> +                tmpfile.rewind()?;
>>> +                let (csum, size) = sha256(&mut tmpfile)?;
>>> +                verify_archive(archive_info, &csum, size)
>>> +            })
>>> +            .with_context(|| archive_prefix.clone())?;
>>>           }
>>>       }
>>>       if let Err(err) = std::fs::rename(&tmp_path, &path) {
>>> -        bail!("Atomic rename file {:?} failed - {}", path, err);
>>> +        bail!("{archive_prefix}: Atomic rename file {path:?} failed 
>>> - {err}");
>>>       }
>>>       backend
>>>           .upload_index_to_backend(snapshot, archive_name)
>>> -        .await?;
>>> +        .await
>>> +        .with_context(|| archive_prefix.clone())?;
>>>       Ok(sync_stats)
>>>   }
>>> @@ -388,13 +429,24 @@ async fn pull_snapshot<'a>(
>>>       encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>>       corrupt: bool,
>>>       is_new: bool,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>> +    let prefix = snapshot.backup_time_string().to_owned();
>>>       if is_new {
>>> -        info!("sync snapshot {}", snapshot.dir());
>>> +        log_sender
>>> +            .log(Level::INFO, format!("{prefix}: start sync"))
>>> +            .await?;
>>>       } else if corrupt {
>>> -        info!("re-sync snapshot {} due to corruption", snapshot.dir());
>>> +        log_sender
>>> +            .log(
>>> +                Level::INFO,
>>> +                format!("re-sync snapshot {prefix} due to corruption"),
>>> +            )
>>> +            .await?;
>>>       } else {
>>> -        info!("re-sync snapshot {}", snapshot.dir());
>>> +        log_sender
>>> +            .log(Level::INFO, format!("re-sync snapshot {prefix}"))
>>> +            .await?;
>>>       }
>>>       let mut sync_stats = SyncStats::default();
>>> @@ -409,7 +461,8 @@ async fn pull_snapshot<'a>(
>>>       let tmp_manifest_blob;
>>>       if let Some(data) = reader
>>>           .load_file_into(MANIFEST_BLOB_NAME.as_ref(), 
>>> &tmp_manifest_name)
>>> -        .await?
>>> +        .await
>>> +        .with_context(|| prefix.clone())?
>>>       {
>>>           tmp_manifest_blob = data;
>>>       } else {
>>> @@ -419,28 +472,34 @@ async fn pull_snapshot<'a>(
>>>       if manifest_name.exists() && !corrupt {
>>>           let manifest_blob = proxmox_lang::try_block!({
>>>               let mut manifest_file = 
>>> std::fs::File::open(&manifest_name).map_err(|err| {
>>> -                format_err!("unable to open local manifest 
>>> {manifest_name:?} - {err}")
>>> +                format_err!("{prefix}: unable to open local manifest 
>>> {manifest_name:?} - {err}")
>>>               })?;
>>> -            let manifest_blob = DataBlob::load_from_reader(&mut 
>>> manifest_file)?;
>>> +            let manifest_blob =
>>> +                DataBlob::load_from_reader(&mut 
>>> manifest_file).with_context(|| prefix.clone())?;
>>>               Ok(manifest_blob)
>>>           })
>>>           .map_err(|err: Error| {
>>> -            format_err!("unable to read local manifest 
>>> {manifest_name:?} - {err}")
>>> +            format_err!("{prefix}: unable to read local manifest 
>>> {manifest_name:?} - {err}")
>>>           })?;
>>>           if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>>>               if !client_log_name.exists() {
>>> -                
>>> reader.try_download_client_log(&client_log_name).await?;
>>> +                reader
>>> +                    .try_download_client_log(&client_log_name)
>>> +                    .await
>>> +                    .with_context(|| prefix.clone())?;
>>>               };
>>> -            info!("no data changes");
>>> +            log_sender
>>> +                .log(Level::INFO, format!("{prefix}: no data changes"))
>>> +                .await?;
>>>               let _ = std::fs::remove_file(&tmp_manifest_name);
>>>               return Ok(sync_stats); // nothing changed
>>>           }
>>>       }
>>>       let manifest_data = tmp_manifest_blob.raw_data().to_vec();
>>> -    let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>>> +    let manifest = 
>>> BackupManifest::try_from(tmp_manifest_blob).with_context(|| 
>>> prefix.clone())?;
>>>       if ignore_not_verified_or_encrypted(
>>>           &manifest,
>>> @@ -464,35 +523,54 @@ async fn pull_snapshot<'a>(
>>>           path.push(&item.filename);
>>>           if !corrupt && path.exists() {
>>> -            let filename: BackupArchiveName = 
>>> item.filename.as_str().try_into()?;
>>> +            let filename: BackupArchiveName = item
>>> +                .filename
>>> +                .as_str()
>>> +                .try_into()
>>> +                .with_context(|| prefix.clone())?;
>>>               match filename.archive_type() {
>>>                   ArchiveType::DynamicIndex => {
>>> -                    let index = DynamicIndexReader::open(&path)?;
>>> +                    let index = 
>>> DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
>>>                       let (csum, size) = index.compute_csum();
>>>                       match manifest.verify_file(&filename, &csum, 
>>> size) {
>>>                           Ok(_) => continue,
>>>                           Err(err) => {
>>> -                            info!("detected changed file {path:?} - 
>>> {err}");
>>> +                            log_sender
>>> +                                .log(
>>> +                                    Level::INFO,
>>> +                                    format!("{prefix}: detected 
>>> changed file {path:?} - {err}"),
>>> +                                )
>>> +                                .await?;
>>>                           }
>>>                       }
>>>                   }
>>>                   ArchiveType::FixedIndex => {
>>> -                    let index = FixedIndexReader::open(&path)?;
>>> +                    let index = 
>>> FixedIndexReader::open(&path).with_context(|| prefix.clone())?;
>>>                       let (csum, size) = index.compute_csum();
>>>                       match manifest.verify_file(&filename, &csum, 
>>> size) {
>>>                           Ok(_) => continue,
>>>                           Err(err) => {
>>> -                            info!("detected changed file {path:?} - 
>>> {err}");
>>> +                            log_sender
>>> +                                .log(
>>> +                                    Level::INFO,
>>> +                                    format!("{prefix}: detected 
>>> changed file {path:?} - {err}"),
>>> +                                )
>>> +                                .await?;
>>>                           }
>>>                       }
>>>                   }
>>>                   ArchiveType::Blob => {
>>> -                    let mut tmpfile = std::fs::File::open(&path)?;
>>> -                    let (csum, size) = sha256(&mut tmpfile)?;
>>> +                    let mut tmpfile = 
>>> std::fs::File::open(&path).with_context(|| prefix.clone())?;
>>> +                    let (csum, size) = sha256(&mut 
>>> tmpfile).with_context(|| prefix.clone())?;
>>>                       match manifest.verify_file(&filename, &csum, 
>>> size) {
>>>                           Ok(_) => continue,
>>>                           Err(err) => {
>>> -                            info!("detected changed file {path:?} - 
>>> {err}");
>>> +                            log_sender
>>> +                                .log(
>>> +                                    Level::INFO,
>>> +                                    format!("{prefix}: detected 
>>> changed file {path:?} - {err}"),
>>> +                                )
>>> +                                .await?;
>>>                           }
>>>                       }
>>>                   }
>>> @@ -505,13 +583,14 @@ async fn pull_snapshot<'a>(
>>>               item,
>>>               encountered_chunks.clone(),
>>>               backend,
>>> +            Arc::clone(&log_sender),
>>>           )
>>>           .await?;
>>>           sync_stats.add(stats);
>>>       }
>>>       if let Err(err) = std::fs::rename(&tmp_manifest_name, 
>>> &manifest_name) {
>>> -        bail!("Atomic rename file {:?} failed - {}", manifest_name, 
>>> err);
>>> +        bail!("{prefix}: Atomic rename file {manifest_name:?} failed 
>>> - {err}");
>>>       }
>>>       if let DatastoreBackend::S3(s3_client) = backend {
>>>           let object_key = pbs_datastore::s3::object_key_from_path(
>>> @@ -524,33 +603,40 @@ async fn pull_snapshot<'a>(
>>>           let _is_duplicate = s3_client
>>>               .upload_replace_with_retry(object_key, data)
>>>               .await
>>> -            .context("failed to upload manifest to s3 backend")?;
>>> +            .context("failed to upload manifest to s3 backend")
>>> +            .with_context(|| prefix.clone())?;
>>>       }
>>>       if !client_log_name.exists() {
>>> -        reader.try_download_client_log(&client_log_name).await?;
>>> +        reader
>>> +            .try_download_client_log(&client_log_name)
>>> +            .await
>>> +            .with_context(|| prefix.clone())?;
>>>           if client_log_name.exists() {
>>>               if let DatastoreBackend::S3(s3_client) = backend {
>>>                   let object_key = 
>>> pbs_datastore::s3::object_key_from_path(
>>>                       &snapshot.relative_path(),
>>>                       CLIENT_LOG_BLOB_NAME.as_ref(),
>>>                   )
>>> -                .context("invalid archive object key")?;
>>> +                .context("invalid archive object key")
>>> +                .with_context(|| prefix.clone())?;
>>>                   let data = tokio::fs::read(&client_log_name)
>>>                       .await
>>> -                    .context("failed to read log file contents")?;
>>> +                    .context("failed to read log file contents")
>>> +                    .with_context(|| prefix.clone())?;
>>>                   let contents = hyper::body::Bytes::from(data);
>>>                   let _is_duplicate = s3_client
>>>                       .upload_replace_with_retry(object_key, contents)
>>>                       .await
>>> -                    .context("failed to upload client log to s3 
>>> backend")?;
>>> +                    .context("failed to upload client log to s3 
>>> backend")
>>> +                    .with_context(|| prefix.clone())?;
>>>               }
>>>           }
>>>       };
>>>       snapshot
>>>           .cleanup_unreferenced_files(&manifest)
>>> -        .map_err(|err| format_err!("failed to cleanup unreferenced 
>>> files - {err}"))?;
>>> +        .map_err(|err| format_err!("{prefix}: failed to cleanup 
>>> unreferenced files - {err}"))?;
>>>       Ok(sync_stats)
>>>   }
>>> @@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>(
>>>       snapshot: &'a pbs_datastore::BackupDir,
>>>       encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>>       corrupt: bool,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>> +    let prefix = format!("{}", snapshot.backup_time_string());
>>> +
>>>       let (_path, is_new, _snap_lock) = snapshot
>>>           .datastore()
>>> -        .create_locked_backup_dir(snapshot.backup_ns(), 
>>> snapshot.as_ref())?;
>>> +        .create_locked_backup_dir(snapshot.backup_ns(), 
>>> snapshot.as_ref())
>>> +        .context(prefix.clone())?;
>>>       let result = pull_snapshot(
>>>           params,
>>> @@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>(
>>>           encountered_chunks,
>>>           corrupt,
>>>           is_new,
>>> +        Arc::clone(&log_sender),
>>>       )
>>>       .await;
>>> @@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>(
>>>                       snapshot.as_ref(),
>>>                       true,
>>>                   ) {
>>> -                    info!("cleanup error - {cleanup_err}");
>>> +                    log_sender
>>> +                        .log(
>>> +                            Level::INFO,
>>> +                            format!("{prefix}: cleanup error - 
>>> {cleanup_err}"),
>>> +                        )
>>> +                        .await?;
>>>                   }
>>>                   return Err(err);
>>>               }
>>> -            Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
>>> +            Ok(_) => {
>>> +                log_sender
>>> +                    .log(Level::INFO, format!("{prefix}: sync done"))
>>> +                    .await?
>>> +            }
>>>           }
>>>       }
>>> @@ -622,7 +722,9 @@ async fn pull_group(
>>>       source_namespace: &BackupNamespace,
>>>       group: &BackupGroup,
>>>       shared_group_progress: Arc<SharedGroupProgress>,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>> +    let prefix = format!("{group}");
>>>       let mut already_synced_skip_info = 
>>> SkipInfo::new(SkipReason::AlreadySynced);
>>>       let mut transfer_last_skip_info = 
>>> SkipInfo::new(SkipReason::TransferLast);
>>> @@ -714,11 +816,15 @@ async fn pull_group(
>>>           .collect();
>>>       if already_synced_skip_info.count > 0 {
>>> -        info!("{already_synced_skip_info}");
>>> +        log_sender
>>> +            .log(Level::INFO, format!("{prefix}: 
>>> {already_synced_skip_info}"))
>>> +            .await?;
>>>           already_synced_skip_info.reset();
>>>       }
>>>       if transfer_last_skip_info.count > 0 {
>>> -        info!("{transfer_last_skip_info}");
>>> +        log_sender
>>> +            .log(Level::INFO, format!("{prefix}: 
>>> {transfer_last_skip_info}"))
>>> +            .await?;
>>>           transfer_last_skip_info.reset();
>>>       }
>>> @@ -730,8 +836,8 @@ async fn pull_group(
>>>           .store
>>>           .backup_group(target_ns.clone(), group.clone());
>>>       if let Some(info) = 
>>> backup_group.last_backup(true).unwrap_or(None) {
>>> -        let mut reusable_chunks = encountered_chunks.lock().unwrap();
>>>           if let Err(err) = proxmox_lang::try_block!({
>>> +            let mut reusable_chunks = 
>>> encountered_chunks.lock().unwrap();
>>>               let _snapshot_guard = info
>>>                   .backup_dir
>>>                   .lock_shared()
>>> @@ -780,7 +886,12 @@ async fn pull_group(
>>>               }
>>>               Ok::<(), Error>(())
>>>           }) {
>>> -            warn!("Failed to collect reusable chunk from last 
>>> backup: {err:#?}");
>>> +            log_sender
>>> +                .log(
>>> +                    Level::WARN,
>>> +                    format!("Failed to collect reusable chunk from 
>>> last backup: {err:#?}"),
>>> +                )
>>> +                .await?;
>>>           }
>>>       }
>>> @@ -805,13 +916,16 @@ async fn pull_group(
>>>               &to_snapshot,
>>>               encountered_chunks.clone(),
>>>               corrupt,
>>> +            Arc::clone(&log_sender),
>>>           )
>>>           .await;
>>>           // Update done groups progress by other parallel running pulls
>>>           local_progress.done_groups = 
>>> shared_group_progress.load_done();
>>>           local_progress.done_snapshots = pos as u64 + 1;
>>> -        info!("percentage done: group {group}: {local_progress}");
>>> +        log_sender
>>> +            .log(Level::INFO, format!("percentage done: 
>>> {local_progress}"))
>>> +            .await?;
>>>           let stats = result?; // stop on error
>>>           sync_stats.add(stats);
>>> @@ -829,13 +943,23 @@ async fn pull_group(
>>>                   continue;
>>>               }
>>>               if snapshot.is_protected() {
>>> -                info!(
>>> -                    "don't delete vanished snapshot {} (protected)",
>>> -                    snapshot.dir()
>>> -                );
>>> +                log_sender
>>> +                    .log(
>>> +                        Level::INFO,
>>> +                        format!(
>>> +                            "{prefix}: don't delete vanished 
>>> snapshot {} (protected)",
>>> +                            snapshot.dir(),
>>> +                        ),
>>> +                    )
>>> +                    .await?;
>>>                   continue;
>>>               }
>>> -            info!("delete vanished snapshot {}", snapshot.dir());
>>> +            log_sender
>>> +                .log(
>>> +                    Level::INFO,
>>> +                    format!("delete vanished snapshot {}", 
>>> snapshot.dir()),
>>> +                )
>>> +                .await?;
>>>               params
>>>                   .target
>>>                   .store
>>> @@ -1035,10 +1159,7 @@ pub(crate) async fn pull_store(mut params: 
>>> PullParameters) -> Result<SyncStats,
>>>               }
>>>               Err(err) => {
>>>                   errors = true;
>>> -                info!(
>>> -                    "Encountered errors while syncing namespace {} - 
>>> {err}",
>>> -                    &namespace,
>>> -                );
>>> +                info!("Encountered errors while syncing namespace 
>>> {namespace} - {err}");
>>>               }
>>>           };
>>>       }
>>> @@ -1064,6 +1185,7 @@ async fn lock_and_pull_group(
>>>       namespace: &BackupNamespace,
>>>       target_namespace: &BackupNamespace,
>>>       shared_group_progress: Arc<SharedGroupProgress>,
>>> +    log_sender: Arc<LogLineSender>,
>>>   ) -> Result<SyncStats, Error> {
>>>       let (owner, _lock_guard) =
>>>           match params
>>> @@ -1073,25 +1195,47 @@ async fn lock_and_pull_group(
>>>           {
>>>               Ok(res) => res,
>>>               Err(err) => {
>>> -                info!("sync group {group} failed - group lock 
>>> failed: {err}");
>>> -                info!("create_locked_backup_group failed");
>>> +                log_sender
>>> +                    .log(
>>> +                        Level::INFO,
>>> +                        format!("sync group {group} failed - group 
>>> lock failed: {err}"),
>>> +                    )
>>> +                    .await?;
>>> +                log_sender
>>> +                    .log(Level::INFO, "create_locked_backup_group 
>>> failed".to_string())
>>> +                    .await?;
>>>                   return Err(err);
>>>               }
>>>           };
>>>       if params.owner != owner {
>>>           // only the owner is allowed to create additional snapshots
>>> -        info!(
>>> -            "sync group {group} failed - owner check failed ({} != 
>>> {owner})",
>>> -            params.owner
>>> -        );
>>> +        log_sender
>>> +            .log(
>>> +                Level::INFO,
>>> +                format!(
>>> +                    "sync group {group} failed - owner check failed 
>>> ({} != {owner})",
>>> +                    params.owner,
>>> +                ),
>>> +            )
>>> +            .await?;
>>>           return Err(format_err!("owner check failed"));
>>>       }
>>> -    match pull_group(params, namespace, group, 
>>> shared_group_progress).await {
>>> +    match pull_group(
>>> +        params,
>>> +        namespace,
>>> +        group,
>>> +        shared_group_progress,
>>> +        Arc::clone(&log_sender),
>>> +    )
>>> +    .await
>>> +    {
>>>           Ok(stats) => Ok(stats),
>>>           Err(err) => {
>>> -            info!("sync group {group} failed - {err:#}");
>>> +            log_sender
>>> +                .log(Level::INFO, format!("sync group {group} failed 
>>> - {err:#}"))
>>> +                .await?;
>>>               Err(err)
>>>           }
>>>       }
>>> @@ -1124,7 +1268,7 @@ async fn pull_ns(
>>>       list.sort_unstable();
>>>       info!(
>>> -        "found {} groups to sync (out of {unfiltered_count} total)",
>>> +        "Found {} groups to sync (out of {unfiltered_count} total)",
>>>           list.len()
>>>       );
>>> @@ -1143,6 +1287,10 @@ async fn pull_ns(
>>>       let shared_group_progress = 
>>> Arc::new(SharedGroupProgress::with_total_groups(list.len()));
>>>       let mut group_workers = 
>>> BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
>>> +    let (buffered_logger, sender_builder) = BufferedLogger::new(5, 
>>> Duration::from_secs(1));
>>> +    // runs until sender_builder and all senders build from it are 
>>> being dropped
>>> +    buffered_logger.run_log_collection();
>>> +
>>>       let mut process_results = |results| {
>>>           for result in results {
>>>               match result {
>>> @@ -1160,16 +1308,20 @@ async fn pull_ns(
>>>           let target_ns = target_ns.clone();
>>>           let params = Arc::clone(&params);
>>>           let group_progress_cloned = 
>>> Arc::clone(&shared_group_progress);
>>> +        let log_sender = 
>>> Arc::new(sender_builder.sender_with_label(group.to_string()));
>>>           let results = group_workers
>>>               .spawn_task(async move {
>>> -                lock_and_pull_group(
>>> +                let result = lock_and_pull_group(
>>>                       Arc::clone(&params),
>>>                       &group,
>>>                       &namespace,
>>>                       &target_ns,
>>>                       group_progress_cloned,
>>> +                    Arc::clone(&log_sender),
>>>                   )
>>> -                .await
>>> +                .await;
>>> +                let _ = log_sender.flush().await;
>>> +                result
>>>               })
>>>               .await
>>>               .map_err(|err| format_err!("failed to join on worker 
>>> task: {err:#}"))?;
>>> @@ -1197,7 +1349,7 @@ async fn pull_ns(
>>>                   if !local_group.apply_filters(&params.group_filter) {
>>>                       continue;
>>>                   }
>>> -                info!("delete vanished group '{local_group}'");
>>> +                info!("Delete vanished group '{local_group}'");
>>>                   let delete_stats_result = params
>>>                       .target
>>>                       .store
>>> @@ -1206,7 +1358,7 @@ async fn pull_ns(
>>>                   match delete_stats_result {
>>>                       Ok(stats) => {
>>>                           if !stats.all_removed() {
>>> -                            info!("kept some protected snapshots of 
>>> group '{local_group}'");
>>> +                            info!("Kept some protected snapshots of 
>>> group '{local_group}'");
>>>                               
>>> sync_stats.add(SyncStats::from(RemovedVanishedStats {
>>>                                   snapshots: stats.removed_snapshots(),
>>>                                   groups: 0,
>>> @@ -1229,7 +1381,7 @@ async fn pull_ns(
>>>               Ok(())
>>>           });
>>>           if let Err(err) = result {
>>> -            info!("error during cleanup: {err}");
>>> +            info!("Error during cleanup: {err}");
>>>               errors = true;
>>>           };
>>>       }
>>> diff --git a/src/server/sync.rs b/src/server/sync.rs
>>> index e88418442..17ed4839f 100644
>>> --- a/src/server/sync.rs
>>> +++ b/src/server/sync.rs
>>> @@ -13,7 +13,6 @@ use futures::{future::FutureExt, select};
>>>   use hyper::http::StatusCode;
>>>   use pbs_config::BackupLockGuard;
>>>   use serde_json::json;
>>> -use tokio::task::JoinSet;
>>>   use tracing::{info, warn};
>>>   use proxmox_human_byte::HumanByte;
>>> @@ -136,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader {
>>>                   Some(HttpError { code, message }) => match *code {
>>>                       StatusCode::NOT_FOUND => {
>>>                           info!(
>>> -                            "skipping snapshot {} - vanished since 
>>> start of sync",
>>> +                            "Snapshot {}: skipped because vanished 
>>> since start of sync",
>>>                               &self.dir
>>>                           );
>>>                           return Ok(None);
>>>                       }
>>>                       _ => {
>>> -                        bail!("HTTP error {code} - {message}");
>>> +                        bail!("Snapshot {}: HTTP error {code} - 
>>> {message}", &self.dir);
>>>                       }
>>>                   },
>>>                   None => {
>>> @@ -176,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader {
>>>                   bail!("Atomic rename file {to_path:?} failed - 
>>> {err}");
>>>               }
>>>               info!(
>>> -                "got backup log file {client_log_name}",
>>> +                "Snapshot {snapshot}: got backup log file 
>>> {client_log_name}",
>>> +                snapshot = &self.dir,
>>>                   client_log_name = client_log_name.deref()
>>>               );
>>>           }
>>> -- 
>>> 2.47.3
>>>
>>>
>>>
>>>
>>>
>>>
> 
> 
> 
> 
> 





  reply	other threads:[~2026-04-21  7:43 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-20 10:57   ` Fabian Grünbichler
2026-04-20 17:15     ` Christian Ebner
2026-04-21  6:49       ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-20 11:15   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-20 12:29   ` Fabian Grünbichler
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-20 11:56   ` Fabian Grünbichler
2026-04-21  7:21     ` Christian Ebner
2026-04-21  7:42       ` Christian Ebner [this message]
2026-04-21  8:00         ` Fabian Grünbichler
2026-04-21  8:04           ` Christian Ebner
2026-04-21 12:57     ` Thomas Lamprecht
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
2026-04-20 12:33 ` [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Fabian Grünbichler
2026-04-21 10:28 ` superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=7f8852e8-c092-4e2d-a031-192c087fb364@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal