From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id B69C11FF142 for ; Tue, 21 Apr 2026 09:43:03 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 92BE316CEA; Tue, 21 Apr 2026 09:43:03 +0200 (CEST) Message-ID: <7f8852e8-c092-4e2d-a031-192c087fb364@proxmox.com> Date: Tue, 21 Apr 2026 09:42:23 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context To: pbs-devel@lists.proxmox.com References: <20260417092621.455374-1-c.ebner@proxmox.com> <20260417092621.455374-12-c.ebner@proxmox.com> <1776684473.p5hg4a1vym.astroid@yuna.none> <295ee353-2731-474b-aad0-d1f8a2bcb443@proxmox.com> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <295ee353-2731-474b-aad0-d1f8a2bcb443@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776757259534 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.929 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_MAILER 2 Automated Mailer Tag Left in Email SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [sync.rs,proxmox.com,pull.rs] Message-ID-Hash: OQEOFFC2XICTSO4VYKXILLJCSTZYKM2N X-Message-ID-Hash: OQEOFFC2XICTSO4VYKXILLJCSTZYKM2N X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On 4/21/26 9:21 AM, Christian Ebner wrote: > On 4/20/26 1:54 PM, Fabian Grünbichler wrote: >> On April 17, 2026 11:26 am, Christian Ebner wrote: >>> Pulling groups and therefore also snapshots in parallel leads to >>> unordered log outputs, making it mostly impossible to relate a log >>> message to a backup snapshot/group. >>> >>> Therefore, prefix pull job log messages by the corresponding group or >>> snapshot and set the error context accordingly. >>> >>> Also, reword some messages, inline variables in format strings and >>> start log lines with capital letters to get consistent output. >>> >>> By using the buffered logger implementation and buffer up to 5 lines >>> with a timeout of 1 second, subsequent log lines arriving in fast >>> succession are kept together, reducing the mixing of lines. >>> >>> Example output for a sequential pull job: >>> ``` >>> ... >>> [ct/100]: 2025-11-17T10:11:42Z: start sync >>> [ct/100]: 2025-11-17T10:11:42Z: pct.conf.blob: sync archive >>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: sync archive >>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: downloaded 16.785 >>> MiB (280.791 MiB/s) >>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: sync archive >>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: downloaded 65.703 >>> KiB (29.1 MiB/s) >>> [ct/100]: 2025-11-17T10:11:42Z: sync done >>> [ct/100]: percentage done: 9.09% (1/11 groups) >>> [ct/101]: 2026-03-31T12:20:16Z: start sync >>> [ct/101]: 2026-03-31T12:20:16Z: pct.conf.blob: sync archive >>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: sync archive >>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: downloaded 199.806 >>> MiB (311.91 MiB/s) >>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: sync archive >>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: downloaded >>> 180.379 KiB (22.748 MiB/s) >>> [ct/101]: 2026-03-31T12:20:16Z: sync done >> >> this is probably the wrong patch for this comment, but since you have >> the sample output here ;) >> >> should we pad the labels? the buffered logger knows all currently active >> labels and could adapt it? otherwise especially for host backups the >> logs are again not really scannable by humans, because there will be >> weird jumps in alignment.. or (.. continued below) > > That will not work though? While the logger knows about the labels when > receiving them, it does not know at all about future ones that might > still come in. So it can happen that the padding does not fit anymore, > and one gets even uglier formatting issues. > > I would prefer to keep them as is for the time being. For vm/ct it is > not that bad, one could maybe forsee the ID to be a 4 digit number and > define a minimum label lenght to be padded if not reached. > > Host backups or backups with explicit label > > >>> ... >>> ``` >>> >>> Example output for a parallel pull job: >>> ``` >>> ... >>> [ct/107]: 2025-07-16T09:14:01Z: start sync >>> [ct/107]: 2025-07-16T09:14:01Z: pct.conf.blob: sync archive >>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: sync archive >>> [vm/108]: 2025-09-19T07:37:19Z: start sync >>> [vm/108]: 2025-09-19T07:37:19Z: qemu-server.conf.blob: sync archive >>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: sync archive >>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: downloaded 609.233 >>> MiB (112.628 MiB/s) >>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: sync archive >>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: downloaded 1.172 MiB >>> (17.838 MiB/s) >>> [ct/107]: 2025-07-16T09:14:01Z: sync done >>> [ct/107]: percentage done: 72.73% (8/11 groups) >> >> the way the prefix and snapshot are formatted could also be interpreted >> at first glance as a timestamp of the log line.. why not just prepend >> the prefix on the logger side, and leave it up to the caller to do the >> formatting? then we could us "{type}/{id}/" as prefix here? or add >> 'snapshot' in those lines to make it clear? granted, this is more of an >> issue when viewing the log via `proxmox-backup-manager`, as in the UI we >> have the log timestamps up front.. > > I simply followed along the line of your suggestion here. In prior > versions I had exactly that but you rejected it as to verbose? :) > > https://lore.proxmox.com/pbs-devel/1774263381.bngcrer2th.astroid@yuna.none/ > >> >> and maybe(?) log the progress line using a different prefix? because >> right now the information that the group [ct/107] is finished is not >> really clear from the output, IMHO. > > That I can add, yes. > >> >> the progress logging is also still broken (this is for a sync that takes >> a while, this is not log messages being buffered and re-ordered!): >> >> $ proxmox-backup-manager task log >> 'UPID:yuna:00070656:001E420F:00000002:69E610EB:syncjob:local\x3atest\x3atank\x3a\x3as\x2dbc01cba6\x2d805a:root@pam:' | grep -e namespace -e 'percentage done' >> Syncing datastore 'test', root namespace into datastore 'tank', root >> namespace >> Finished syncing root namespace, current progress: 0 groups, 0 snapshots >> Syncing datastore 'test', namespace 'test' into datastore 'tank', >> namespace 'test' >> [host/exclusion-test]: percentage done: 5.26% (1/19 groups) >> [host/acltest]: percentage done: 5.26% (1/19 groups) >> [host/logtest]: percentage done: 5.26% (1/19 groups) >> [host/onemeg]: percentage done: 5.26% (1/19 groups) >> [host/fourmeg]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in >> group #1) >> [host/symlink]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in >> group #1) >> [host/symlink]: percentage done: 5.26% (1/19 groups) >> [host/fourmeg]: percentage done: 5.26% (1/19 groups) >> [host/format-v2-test]: percentage done: 1.75% (0/19 groups, 1/3 >> snapshots in group #1) >> [host/format-v2-test]: percentage done: 3.51% (0/19 groups, 2/3 >> snapshots in group #1) >> [host/format-v2-test]: percentage done: 5.26% (1/19 groups) >> [host/incrementaltest2]: percentage done: 2.63% (0/19 groups, 1/2 >> snapshots in group #1) >> [host/incrementaltest2]: percentage done: 5.26% (1/19 groups) > > There will always be cases where it does not work I guess, not sure what > I can do to make you happy here. Oh, sorry, might have been to quick on this one. You are saying this happens for a non-parallel sync? >> >>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: downloaded >>> 1.196 GiB (156.892 MiB/s) >>> [vm/108]: 2025-09-19T07:37:19Z: sync done >>> ... >>> >>> Signed-off-by: Christian Ebner >>> --- >>> changes since version 5: >>> - uses BufferedLogger implementation, refactored accordingly >>> - improve log line prefixes >>> - add missing error contexts >>> >>>   src/server/pull.rs | 314 +++++++++++++++++++++++++++++++++------------ >>>   src/server/sync.rs |   8 +- >>>   2 files changed, 237 insertions(+), 85 deletions(-) >>> >>> diff --git a/src/server/pull.rs b/src/server/pull.rs >>> index 611441d2a..f7aae4d59 100644 >>> --- a/src/server/pull.rs >>> +++ b/src/server/pull.rs >>> @@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet}; >>>   use std::io::Seek; >>>   use std::sync::atomic::{AtomicUsize, Ordering}; >>>   use std::sync::{Arc, Mutex}; >>> -use std::time::SystemTime; >>> +use std::time::{Duration, SystemTime}; >>>   use anyhow::{bail, format_err, Context, Error}; >>>   use proxmox_human_byte::HumanByte; >>> -use tracing::{info, warn}; >>> +use tracing::{info, Level}; >>>   use pbs_api_types::{ >>>       print_store_and_ns, ArchiveType, Authid, BackupArchiveName, >>> BackupDir, BackupGroup, >>> @@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest, >>> FileInfo}; >>>   use pbs_datastore::read_chunk::AsyncReadChunk; >>>   use pbs_datastore::{check_backup_owner, DataStore, >>> DatastoreBackend, StoreProgress}; >>>   use pbs_tools::bounded_join_set::BoundedJoinSet; >>> +use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender}; >>>   use pbs_tools::sha::sha256; >>>   use super::sync::{ >>> @@ -153,6 +154,8 @@ async fn pull_index_chunks( >>>       index: I, >>>       encountered_chunks: Arc>, >>>       backend: &DatastoreBackend, >>> +    archive_prefix: &str, >>> +    log_sender: Arc, >>>   ) -> Result { >>>       use futures::stream::{self, StreamExt, TryStreamExt}; >>> @@ -247,11 +250,16 @@ async fn pull_index_chunks( >>>       let bytes = bytes.load(Ordering::SeqCst); >>>       let chunk_count = chunk_count.load(Ordering::SeqCst); >>> -    info!( >>> -        "downloaded {} ({}/s)", >>> -        HumanByte::from(bytes), >>> -        HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), >>> -    ); >>> +    log_sender >>> +        .log( >>> +            Level::INFO, >>> +            format!( >>> +                "{archive_prefix}: downloaded {} ({}/s)", >>> +                HumanByte::from(bytes), >>> +                HumanByte::new_binary(bytes as f64 / >>> elapsed.as_secs_f64()), >>> +            ), >>> +        ) >>> +        .await?; >>>       Ok(SyncStats { >>>           chunk_count, >>> @@ -292,6 +300,7 @@ async fn pull_single_archive<'a>( >>>       archive_info: &'a FileInfo, >>>       encountered_chunks: Arc>, >>>       backend: &DatastoreBackend, >>> +    log_sender: Arc, >>>   ) -> Result { >>>       let archive_name = &archive_info.filename; >>>       let mut path = snapshot.full_path(); >>> @@ -302,72 +311,104 @@ async fn pull_single_archive<'a>( >>>       let mut sync_stats = SyncStats::default(); >>> -    info!("sync archive {archive_name}"); >>> +    let archive_prefix = format!("{}: {archive_name}", >>> snapshot.backup_time_string()); >>> + >>> +    log_sender >>> +        .log(Level::INFO, format!("{archive_prefix}: sync archive")) >>> +        .await?; >>> -    reader.load_file_into(archive_name, &tmp_path).await?; >>> +    reader >>> +        .load_file_into(archive_name, &tmp_path) >>> +        .await >>> +        .with_context(|| archive_prefix.clone())?; >>> -    let mut tmpfile = >>> std::fs::OpenOptions::new().read(true).open(&tmp_path)?; >>> +    let mut tmpfile = std::fs::OpenOptions::new() >>> +        .read(true) >>> +        .open(&tmp_path) >>> +        .with_context(|| archive_prefix.clone())?; >>>       match ArchiveType::from_path(archive_name)? { >>>           ArchiveType::DynamicIndex => { >>>               let index = DynamicIndexReader::new(tmpfile).map_err(| >>> err| { >>> -                format_err!("unable to read dynamic index {:?} - >>> {}", tmp_path, err) >>> +                format_err!("{archive_prefix}: unable to read >>> dynamic index {tmp_path:?} - {err}") >>>               })?; >>>               let (csum, size) = index.compute_csum(); >>> -            verify_archive(archive_info, &csum, size)?; >>> +            verify_archive(archive_info, &csum, >>> size).with_context(|| archive_prefix.clone())?; >>>               if reader.skip_chunk_sync(snapshot.datastore().name()) { >>> -                info!("skipping chunk sync for same datastore"); >>> +                log_sender >>> +                    .log( >>> +                        Level::INFO, >>> +                        format!("{archive_prefix}: skipping chunk >>> sync for same datastore"), >>> +                    ) >>> +                    .await?; >>>               } else { >>>                   let stats = pull_index_chunks( >>>                       reader >>>                           .chunk_reader(archive_info.crypt_mode) >>> -                        .context("failed to get chunk reader")?, >>> +                        .context("failed to get chunk reader") >>> +                        .with_context(|| archive_prefix.clone())?, >>>                       snapshot.datastore().clone(), >>>                       index, >>>                       encountered_chunks, >>>                       backend, >>> +                    &archive_prefix, >>> +                    Arc::clone(&log_sender), >>>                   ) >>> -                .await?; >>> +                .await >>> +                .with_context(|| archive_prefix.clone())?; >>>                   sync_stats.add(stats); >>>               } >>>           } >>>           ArchiveType::FixedIndex => { >>>               let index = FixedIndexReader::new(tmpfile).map_err(|err| { >>> -                format_err!("unable to read fixed index '{:?}' - >>> {}", tmp_path, err) >>> +                format_err!("{archive_name}: unable to read fixed >>> index '{tmp_path:?}' - {err}") >>>               })?; >>>               let (csum, size) = index.compute_csum(); >>> -            verify_archive(archive_info, &csum, size)?; >>> +            verify_archive(archive_info, &csum, >>> size).with_context(|| archive_prefix.clone())?; >>>               if reader.skip_chunk_sync(snapshot.datastore().name()) { >>> -                info!("skipping chunk sync for same datastore"); >>> +                log_sender >>> +                    .log( >>> +                        Level::INFO, >>> +                        format!("{archive_prefix}: skipping chunk >>> sync for same datastore"), >>> +                    ) >>> +                    .await?; >>>               } else { >>>                   let stats = pull_index_chunks( >>>                       reader >>>                           .chunk_reader(archive_info.crypt_mode) >>> -                        .context("failed to get chunk reader")?, >>> +                        .context("failed to get chunk reader") >>> +                        .with_context(|| archive_prefix.clone())?, >>>                       snapshot.datastore().clone(), >>>                       index, >>>                       encountered_chunks, >>>                       backend, >>> +                    &archive_prefix, >>> +                    Arc::clone(&log_sender), >>>                   ) >>> -                .await?; >>> +                .await >>> +                .with_context(|| archive_prefix.clone())?; >>>                   sync_stats.add(stats); >>>               } >>>           } >>>           ArchiveType::Blob => { >>> -            tmpfile.rewind()?; >>> -            let (csum, size) = sha256(&mut tmpfile)?; >>> -            verify_archive(archive_info, &csum, size)?; >>> +            proxmox_lang::try_block!({ >>> +                tmpfile.rewind()?; >>> +                let (csum, size) = sha256(&mut tmpfile)?; >>> +                verify_archive(archive_info, &csum, size) >>> +            }) >>> +            .with_context(|| archive_prefix.clone())?; >>>           } >>>       } >>>       if let Err(err) = std::fs::rename(&tmp_path, &path) { >>> -        bail!("Atomic rename file {:?} failed - {}", path, err); >>> +        bail!("{archive_prefix}: Atomic rename file {path:?} failed >>> - {err}"); >>>       } >>>       backend >>>           .upload_index_to_backend(snapshot, archive_name) >>> -        .await?; >>> +        .await >>> +        .with_context(|| archive_prefix.clone())?; >>>       Ok(sync_stats) >>>   } >>> @@ -388,13 +429,24 @@ async fn pull_snapshot<'a>( >>>       encountered_chunks: Arc>, >>>       corrupt: bool, >>>       is_new: bool, >>> +    log_sender: Arc, >>>   ) -> Result { >>> +    let prefix = snapshot.backup_time_string().to_owned(); >>>       if is_new { >>> -        info!("sync snapshot {}", snapshot.dir()); >>> +        log_sender >>> +            .log(Level::INFO, format!("{prefix}: start sync")) >>> +            .await?; >>>       } else if corrupt { >>> -        info!("re-sync snapshot {} due to corruption", snapshot.dir()); >>> +        log_sender >>> +            .log( >>> +                Level::INFO, >>> +                format!("re-sync snapshot {prefix} due to corruption"), >>> +            ) >>> +            .await?; >>>       } else { >>> -        info!("re-sync snapshot {}", snapshot.dir()); >>> +        log_sender >>> +            .log(Level::INFO, format!("re-sync snapshot {prefix}")) >>> +            .await?; >>>       } >>>       let mut sync_stats = SyncStats::default(); >>> @@ -409,7 +461,8 @@ async fn pull_snapshot<'a>( >>>       let tmp_manifest_blob; >>>       if let Some(data) = reader >>>           .load_file_into(MANIFEST_BLOB_NAME.as_ref(), >>> &tmp_manifest_name) >>> -        .await? >>> +        .await >>> +        .with_context(|| prefix.clone())? >>>       { >>>           tmp_manifest_blob = data; >>>       } else { >>> @@ -419,28 +472,34 @@ async fn pull_snapshot<'a>( >>>       if manifest_name.exists() && !corrupt { >>>           let manifest_blob = proxmox_lang::try_block!({ >>>               let mut manifest_file = >>> std::fs::File::open(&manifest_name).map_err(|err| { >>> -                format_err!("unable to open local manifest >>> {manifest_name:?} - {err}") >>> +                format_err!("{prefix}: unable to open local manifest >>> {manifest_name:?} - {err}") >>>               })?; >>> -            let manifest_blob = DataBlob::load_from_reader(&mut >>> manifest_file)?; >>> +            let manifest_blob = >>> +                DataBlob::load_from_reader(&mut >>> manifest_file).with_context(|| prefix.clone())?; >>>               Ok(manifest_blob) >>>           }) >>>           .map_err(|err: Error| { >>> -            format_err!("unable to read local manifest >>> {manifest_name:?} - {err}") >>> +            format_err!("{prefix}: unable to read local manifest >>> {manifest_name:?} - {err}") >>>           })?; >>>           if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { >>>               if !client_log_name.exists() { >>> - >>> reader.try_download_client_log(&client_log_name).await?; >>> +                reader >>> +                    .try_download_client_log(&client_log_name) >>> +                    .await >>> +                    .with_context(|| prefix.clone())?; >>>               }; >>> -            info!("no data changes"); >>> +            log_sender >>> +                .log(Level::INFO, format!("{prefix}: no data changes")) >>> +                .await?; >>>               let _ = std::fs::remove_file(&tmp_manifest_name); >>>               return Ok(sync_stats); // nothing changed >>>           } >>>       } >>>       let manifest_data = tmp_manifest_blob.raw_data().to_vec(); >>> -    let manifest = BackupManifest::try_from(tmp_manifest_blob)?; >>> +    let manifest = >>> BackupManifest::try_from(tmp_manifest_blob).with_context(|| >>> prefix.clone())?; >>>       if ignore_not_verified_or_encrypted( >>>           &manifest, >>> @@ -464,35 +523,54 @@ async fn pull_snapshot<'a>( >>>           path.push(&item.filename); >>>           if !corrupt && path.exists() { >>> -            let filename: BackupArchiveName = >>> item.filename.as_str().try_into()?; >>> +            let filename: BackupArchiveName = item >>> +                .filename >>> +                .as_str() >>> +                .try_into() >>> +                .with_context(|| prefix.clone())?; >>>               match filename.archive_type() { >>>                   ArchiveType::DynamicIndex => { >>> -                    let index = DynamicIndexReader::open(&path)?; >>> +                    let index = >>> DynamicIndexReader::open(&path).with_context(|| prefix.clone())?; >>>                       let (csum, size) = index.compute_csum(); >>>                       match manifest.verify_file(&filename, &csum, >>> size) { >>>                           Ok(_) => continue, >>>                           Err(err) => { >>> -                            info!("detected changed file {path:?} - >>> {err}"); >>> +                            log_sender >>> +                                .log( >>> +                                    Level::INFO, >>> +                                    format!("{prefix}: detected >>> changed file {path:?} - {err}"), >>> +                                ) >>> +                                .await?; >>>                           } >>>                       } >>>                   } >>>                   ArchiveType::FixedIndex => { >>> -                    let index = FixedIndexReader::open(&path)?; >>> +                    let index = >>> FixedIndexReader::open(&path).with_context(|| prefix.clone())?; >>>                       let (csum, size) = index.compute_csum(); >>>                       match manifest.verify_file(&filename, &csum, >>> size) { >>>                           Ok(_) => continue, >>>                           Err(err) => { >>> -                            info!("detected changed file {path:?} - >>> {err}"); >>> +                            log_sender >>> +                                .log( >>> +                                    Level::INFO, >>> +                                    format!("{prefix}: detected >>> changed file {path:?} - {err}"), >>> +                                ) >>> +                                .await?; >>>                           } >>>                       } >>>                   } >>>                   ArchiveType::Blob => { >>> -                    let mut tmpfile = std::fs::File::open(&path)?; >>> -                    let (csum, size) = sha256(&mut tmpfile)?; >>> +                    let mut tmpfile = >>> std::fs::File::open(&path).with_context(|| prefix.clone())?; >>> +                    let (csum, size) = sha256(&mut >>> tmpfile).with_context(|| prefix.clone())?; >>>                       match manifest.verify_file(&filename, &csum, >>> size) { >>>                           Ok(_) => continue, >>>                           Err(err) => { >>> -                            info!("detected changed file {path:?} - >>> {err}"); >>> +                            log_sender >>> +                                .log( >>> +                                    Level::INFO, >>> +                                    format!("{prefix}: detected >>> changed file {path:?} - {err}"), >>> +                                ) >>> +                                .await?; >>>                           } >>>                       } >>>                   } >>> @@ -505,13 +583,14 @@ async fn pull_snapshot<'a>( >>>               item, >>>               encountered_chunks.clone(), >>>               backend, >>> +            Arc::clone(&log_sender), >>>           ) >>>           .await?; >>>           sync_stats.add(stats); >>>       } >>>       if let Err(err) = std::fs::rename(&tmp_manifest_name, >>> &manifest_name) { >>> -        bail!("Atomic rename file {:?} failed - {}", manifest_name, >>> err); >>> +        bail!("{prefix}: Atomic rename file {manifest_name:?} failed >>> - {err}"); >>>       } >>>       if let DatastoreBackend::S3(s3_client) = backend { >>>           let object_key = pbs_datastore::s3::object_key_from_path( >>> @@ -524,33 +603,40 @@ async fn pull_snapshot<'a>( >>>           let _is_duplicate = s3_client >>>               .upload_replace_with_retry(object_key, data) >>>               .await >>> -            .context("failed to upload manifest to s3 backend")?; >>> +            .context("failed to upload manifest to s3 backend") >>> +            .with_context(|| prefix.clone())?; >>>       } >>>       if !client_log_name.exists() { >>> -        reader.try_download_client_log(&client_log_name).await?; >>> +        reader >>> +            .try_download_client_log(&client_log_name) >>> +            .await >>> +            .with_context(|| prefix.clone())?; >>>           if client_log_name.exists() { >>>               if let DatastoreBackend::S3(s3_client) = backend { >>>                   let object_key = >>> pbs_datastore::s3::object_key_from_path( >>>                       &snapshot.relative_path(), >>>                       CLIENT_LOG_BLOB_NAME.as_ref(), >>>                   ) >>> -                .context("invalid archive object key")?; >>> +                .context("invalid archive object key") >>> +                .with_context(|| prefix.clone())?; >>>                   let data = tokio::fs::read(&client_log_name) >>>                       .await >>> -                    .context("failed to read log file contents")?; >>> +                    .context("failed to read log file contents") >>> +                    .with_context(|| prefix.clone())?; >>>                   let contents = hyper::body::Bytes::from(data); >>>                   let _is_duplicate = s3_client >>>                       .upload_replace_with_retry(object_key, contents) >>>                       .await >>> -                    .context("failed to upload client log to s3 >>> backend")?; >>> +                    .context("failed to upload client log to s3 >>> backend") >>> +                    .with_context(|| prefix.clone())?; >>>               } >>>           } >>>       }; >>>       snapshot >>>           .cleanup_unreferenced_files(&manifest) >>> -        .map_err(|err| format_err!("failed to cleanup unreferenced >>> files - {err}"))?; >>> +        .map_err(|err| format_err!("{prefix}: failed to cleanup >>> unreferenced files - {err}"))?; >>>       Ok(sync_stats) >>>   } >>> @@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>( >>>       snapshot: &'a pbs_datastore::BackupDir, >>>       encountered_chunks: Arc>, >>>       corrupt: bool, >>> +    log_sender: Arc, >>>   ) -> Result { >>> +    let prefix = format!("{}", snapshot.backup_time_string()); >>> + >>>       let (_path, is_new, _snap_lock) = snapshot >>>           .datastore() >>> -        .create_locked_backup_dir(snapshot.backup_ns(), >>> snapshot.as_ref())?; >>> +        .create_locked_backup_dir(snapshot.backup_ns(), >>> snapshot.as_ref()) >>> +        .context(prefix.clone())?; >>>       let result = pull_snapshot( >>>           params, >>> @@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>( >>>           encountered_chunks, >>>           corrupt, >>>           is_new, >>> +        Arc::clone(&log_sender), >>>       ) >>>       .await; >>> @@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>( >>>                       snapshot.as_ref(), >>>                       true, >>>                   ) { >>> -                    info!("cleanup error - {cleanup_err}"); >>> +                    log_sender >>> +                        .log( >>> +                            Level::INFO, >>> +                            format!("{prefix}: cleanup error - >>> {cleanup_err}"), >>> +                        ) >>> +                        .await?; >>>                   } >>>                   return Err(err); >>>               } >>> -            Ok(_) => info!("sync snapshot {} done", snapshot.dir()), >>> +            Ok(_) => { >>> +                log_sender >>> +                    .log(Level::INFO, format!("{prefix}: sync done")) >>> +                    .await? >>> +            } >>>           } >>>       } >>> @@ -622,7 +722,9 @@ async fn pull_group( >>>       source_namespace: &BackupNamespace, >>>       group: &BackupGroup, >>>       shared_group_progress: Arc, >>> +    log_sender: Arc, >>>   ) -> Result { >>> +    let prefix = format!("{group}"); >>>       let mut already_synced_skip_info = >>> SkipInfo::new(SkipReason::AlreadySynced); >>>       let mut transfer_last_skip_info = >>> SkipInfo::new(SkipReason::TransferLast); >>> @@ -714,11 +816,15 @@ async fn pull_group( >>>           .collect(); >>>       if already_synced_skip_info.count > 0 { >>> -        info!("{already_synced_skip_info}"); >>> +        log_sender >>> +            .log(Level::INFO, format!("{prefix}: >>> {already_synced_skip_info}")) >>> +            .await?; >>>           already_synced_skip_info.reset(); >>>       } >>>       if transfer_last_skip_info.count > 0 { >>> -        info!("{transfer_last_skip_info}"); >>> +        log_sender >>> +            .log(Level::INFO, format!("{prefix}: >>> {transfer_last_skip_info}")) >>> +            .await?; >>>           transfer_last_skip_info.reset(); >>>       } >>> @@ -730,8 +836,8 @@ async fn pull_group( >>>           .store >>>           .backup_group(target_ns.clone(), group.clone()); >>>       if let Some(info) = >>> backup_group.last_backup(true).unwrap_or(None) { >>> -        let mut reusable_chunks = encountered_chunks.lock().unwrap(); >>>           if let Err(err) = proxmox_lang::try_block!({ >>> +            let mut reusable_chunks = >>> encountered_chunks.lock().unwrap(); >>>               let _snapshot_guard = info >>>                   .backup_dir >>>                   .lock_shared() >>> @@ -780,7 +886,12 @@ async fn pull_group( >>>               } >>>               Ok::<(), Error>(()) >>>           }) { >>> -            warn!("Failed to collect reusable chunk from last >>> backup: {err:#?}"); >>> +            log_sender >>> +                .log( >>> +                    Level::WARN, >>> +                    format!("Failed to collect reusable chunk from >>> last backup: {err:#?}"), >>> +                ) >>> +                .await?; >>>           } >>>       } >>> @@ -805,13 +916,16 @@ async fn pull_group( >>>               &to_snapshot, >>>               encountered_chunks.clone(), >>>               corrupt, >>> +            Arc::clone(&log_sender), >>>           ) >>>           .await; >>>           // Update done groups progress by other parallel running pulls >>>           local_progress.done_groups = >>> shared_group_progress.load_done(); >>>           local_progress.done_snapshots = pos as u64 + 1; >>> -        info!("percentage done: group {group}: {local_progress}"); >>> +        log_sender >>> +            .log(Level::INFO, format!("percentage done: >>> {local_progress}")) >>> +            .await?; >>>           let stats = result?; // stop on error >>>           sync_stats.add(stats); >>> @@ -829,13 +943,23 @@ async fn pull_group( >>>                   continue; >>>               } >>>               if snapshot.is_protected() { >>> -                info!( >>> -                    "don't delete vanished snapshot {} (protected)", >>> -                    snapshot.dir() >>> -                ); >>> +                log_sender >>> +                    .log( >>> +                        Level::INFO, >>> +                        format!( >>> +                            "{prefix}: don't delete vanished >>> snapshot {} (protected)", >>> +                            snapshot.dir(), >>> +                        ), >>> +                    ) >>> +                    .await?; >>>                   continue; >>>               } >>> -            info!("delete vanished snapshot {}", snapshot.dir()); >>> +            log_sender >>> +                .log( >>> +                    Level::INFO, >>> +                    format!("delete vanished snapshot {}", >>> snapshot.dir()), >>> +                ) >>> +                .await?; >>>               params >>>                   .target >>>                   .store >>> @@ -1035,10 +1159,7 @@ pub(crate) async fn pull_store(mut params: >>> PullParameters) -> Result>>               } >>>               Err(err) => { >>>                   errors = true; >>> -                info!( >>> -                    "Encountered errors while syncing namespace {} - >>> {err}", >>> -                    &namespace, >>> -                ); >>> +                info!("Encountered errors while syncing namespace >>> {namespace} - {err}"); >>>               } >>>           }; >>>       } >>> @@ -1064,6 +1185,7 @@ async fn lock_and_pull_group( >>>       namespace: &BackupNamespace, >>>       target_namespace: &BackupNamespace, >>>       shared_group_progress: Arc, >>> +    log_sender: Arc, >>>   ) -> Result { >>>       let (owner, _lock_guard) = >>>           match params >>> @@ -1073,25 +1195,47 @@ async fn lock_and_pull_group( >>>           { >>>               Ok(res) => res, >>>               Err(err) => { >>> -                info!("sync group {group} failed - group lock >>> failed: {err}"); >>> -                info!("create_locked_backup_group failed"); >>> +                log_sender >>> +                    .log( >>> +                        Level::INFO, >>> +                        format!("sync group {group} failed - group >>> lock failed: {err}"), >>> +                    ) >>> +                    .await?; >>> +                log_sender >>> +                    .log(Level::INFO, "create_locked_backup_group >>> failed".to_string()) >>> +                    .await?; >>>                   return Err(err); >>>               } >>>           }; >>>       if params.owner != owner { >>>           // only the owner is allowed to create additional snapshots >>> -        info!( >>> -            "sync group {group} failed - owner check failed ({} != >>> {owner})", >>> -            params.owner >>> -        ); >>> +        log_sender >>> +            .log( >>> +                Level::INFO, >>> +                format!( >>> +                    "sync group {group} failed - owner check failed >>> ({} != {owner})", >>> +                    params.owner, >>> +                ), >>> +            ) >>> +            .await?; >>>           return Err(format_err!("owner check failed")); >>>       } >>> -    match pull_group(params, namespace, group, >>> shared_group_progress).await { >>> +    match pull_group( >>> +        params, >>> +        namespace, >>> +        group, >>> +        shared_group_progress, >>> +        Arc::clone(&log_sender), >>> +    ) >>> +    .await >>> +    { >>>           Ok(stats) => Ok(stats), >>>           Err(err) => { >>> -            info!("sync group {group} failed - {err:#}"); >>> +            log_sender >>> +                .log(Level::INFO, format!("sync group {group} failed >>> - {err:#}")) >>> +                .await?; >>>               Err(err) >>>           } >>>       } >>> @@ -1124,7 +1268,7 @@ async fn pull_ns( >>>       list.sort_unstable(); >>>       info!( >>> -        "found {} groups to sync (out of {unfiltered_count} total)", >>> +        "Found {} groups to sync (out of {unfiltered_count} total)", >>>           list.len() >>>       ); >>> @@ -1143,6 +1287,10 @@ async fn pull_ns( >>>       let shared_group_progress = >>> Arc::new(SharedGroupProgress::with_total_groups(list.len())); >>>       let mut group_workers = >>> BoundedJoinSet::new(params.worker_threads.unwrap_or(1)); >>> +    let (buffered_logger, sender_builder) = BufferedLogger::new(5, >>> Duration::from_secs(1)); >>> +    // runs until sender_builder and all senders build from it are >>> being dropped >>> +    buffered_logger.run_log_collection(); >>> + >>>       let mut process_results = |results| { >>>           for result in results { >>>               match result { >>> @@ -1160,16 +1308,20 @@ async fn pull_ns( >>>           let target_ns = target_ns.clone(); >>>           let params = Arc::clone(¶ms); >>>           let group_progress_cloned = >>> Arc::clone(&shared_group_progress); >>> +        let log_sender = >>> Arc::new(sender_builder.sender_with_label(group.to_string())); >>>           let results = group_workers >>>               .spawn_task(async move { >>> -                lock_and_pull_group( >>> +                let result = lock_and_pull_group( >>>                       Arc::clone(¶ms), >>>                       &group, >>>                       &namespace, >>>                       &target_ns, >>>                       group_progress_cloned, >>> +                    Arc::clone(&log_sender), >>>                   ) >>> -                .await >>> +                .await; >>> +                let _ = log_sender.flush().await; >>> +                result >>>               }) >>>               .await >>>               .map_err(|err| format_err!("failed to join on worker >>> task: {err:#}"))?; >>> @@ -1197,7 +1349,7 @@ async fn pull_ns( >>>                   if !local_group.apply_filters(¶ms.group_filter) { >>>                       continue; >>>                   } >>> -                info!("delete vanished group '{local_group}'"); >>> +                info!("Delete vanished group '{local_group}'"); >>>                   let delete_stats_result = params >>>                       .target >>>                       .store >>> @@ -1206,7 +1358,7 @@ async fn pull_ns( >>>                   match delete_stats_result { >>>                       Ok(stats) => { >>>                           if !stats.all_removed() { >>> -                            info!("kept some protected snapshots of >>> group '{local_group}'"); >>> +                            info!("Kept some protected snapshots of >>> group '{local_group}'"); >>> >>> sync_stats.add(SyncStats::from(RemovedVanishedStats { >>>                                   snapshots: stats.removed_snapshots(), >>>                                   groups: 0, >>> @@ -1229,7 +1381,7 @@ async fn pull_ns( >>>               Ok(()) >>>           }); >>>           if let Err(err) = result { >>> -            info!("error during cleanup: {err}"); >>> +            info!("Error during cleanup: {err}"); >>>               errors = true; >>>           }; >>>       } >>> diff --git a/src/server/sync.rs b/src/server/sync.rs >>> index e88418442..17ed4839f 100644 >>> --- a/src/server/sync.rs >>> +++ b/src/server/sync.rs >>> @@ -13,7 +13,6 @@ use futures::{future::FutureExt, select}; >>>   use hyper::http::StatusCode; >>>   use pbs_config::BackupLockGuard; >>>   use serde_json::json; >>> -use tokio::task::JoinSet; >>>   use tracing::{info, warn}; >>>   use proxmox_human_byte::HumanByte; >>> @@ -136,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader { >>>                   Some(HttpError { code, message }) => match *code { >>>                       StatusCode::NOT_FOUND => { >>>                           info!( >>> -                            "skipping snapshot {} - vanished since >>> start of sync", >>> +                            "Snapshot {}: skipped because vanished >>> since start of sync", >>>                               &self.dir >>>                           ); >>>                           return Ok(None); >>>                       } >>>                       _ => { >>> -                        bail!("HTTP error {code} - {message}"); >>> +                        bail!("Snapshot {}: HTTP error {code} - >>> {message}", &self.dir); >>>                       } >>>                   }, >>>                   None => { >>> @@ -176,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader { >>>                   bail!("Atomic rename file {to_path:?} failed - >>> {err}"); >>>               } >>>               info!( >>> -                "got backup log file {client_log_name}", >>> +                "Snapshot {snapshot}: got backup log file >>> {client_log_name}", >>> +                snapshot = &self.dir, >>>                   client_log_name = client_log_name.deref() >>>               ); >>>           } >>> -- >>> 2.47.3 >>> >>> >>> >>> >>> >>> > > > > >