From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 4475E1FF136 for ; Mon, 20 Apr 2026 13:56:31 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 09D4626AE7; Mon, 20 Apr 2026 13:56:31 +0200 (CEST) Date: Mon, 20 Apr 2026 13:56:19 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= Subject: Re: [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context To: Christian Ebner , pbs-devel@lists.proxmox.com References: <20260417092621.455374-1-c.ebner@proxmox.com> <20260417092621.455374-12-c.ebner@proxmox.com> In-Reply-To: <20260417092621.455374-12-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.17.0 (https://github.com/astroidmail/astroid) Message-Id: <1776684473.p5hg4a1vym.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776686099044 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.946 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_MAILER 2 Automated Mailer Tag Left in Email SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: D4RSIUZU2JTB3K5IP2ZUYLSEUV2U2VKV X-Message-ID-Hash: D4RSIUZU2JTB3K5IP2ZUYLSEUV2U2VKV X-MailFrom: f.gruenbichler@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On April 17, 2026 11:26 am, Christian Ebner wrote: > Pulling groups and therefore also snapshots in parallel leads to > unordered log outputs, making it mostly impossible to relate a log > message to a backup snapshot/group. >=20 > Therefore, prefix pull job log messages by the corresponding group or > snapshot and set the error context accordingly. >=20 > Also, reword some messages, inline variables in format strings and > start log lines with capital letters to get consistent output. >=20 > By using the buffered logger implementation and buffer up to 5 lines > with a timeout of 1 second, subsequent log lines arriving in fast > succession are kept together, reducing the mixing of lines. >=20 > Example output for a sequential pull job: > ``` > ... > [ct/100]: 2025-11-17T10:11:42Z: start sync > [ct/100]: 2025-11-17T10:11:42Z: pct.conf.blob: sync archive > [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: sync archive > [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: downloaded 16.785 MiB (2= 80.791 MiB/s) > [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: sync archive > [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: downloaded 65.703 KiB (2= 9.1 MiB/s) > [ct/100]: 2025-11-17T10:11:42Z: sync done > [ct/100]: percentage done: 9.09% (1/11 groups) > [ct/101]: 2026-03-31T12:20:16Z: start sync > [ct/101]: 2026-03-31T12:20:16Z: pct.conf.blob: sync archive > [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: sync archive > [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: downloaded 199.806 MiB (3= 11.91 MiB/s) > [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: sync archive > [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: downloaded 180.379 Ki= B (22.748 MiB/s) > [ct/101]: 2026-03-31T12:20:16Z: sync done this is probably the wrong patch for this comment, but since you have the sample output here ;) should we pad the labels? the buffered logger knows all currently active labels and could adapt it? otherwise especially for host backups the logs are again not really scannable by humans, because there will be weird jumps in alignment.. or (.. continued below) > ... > ``` >=20 > Example output for a parallel pull job: > ``` > ... > [ct/107]: 2025-07-16T09:14:01Z: start sync > [ct/107]: 2025-07-16T09:14:01Z: pct.conf.blob: sync archive > [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: sync archive > [vm/108]: 2025-09-19T07:37:19Z: start sync > [vm/108]: 2025-09-19T07:37:19Z: qemu-server.conf.blob: sync archive > [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: sync archive > [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: downloaded 609.233 MiB (= 112.628 MiB/s) > [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: sync archive > [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: downloaded 1.172 MiB (17= .838 MiB/s) > [ct/107]: 2025-07-16T09:14:01Z: sync done > [ct/107]: percentage done: 72.73% (8/11 groups) the way the prefix and snapshot are formatted could also be interpreted at first glance as a timestamp of the log line.. why not just prepend the prefix on the logger side, and leave it up to the caller to do the formatting? then we could us "{type}/{id}/" as prefix here? or add 'snapshot' in those lines to make it clear? granted, this is more of an issue when viewing the log via `proxmox-backup-manager`, as in the UI we have the log timestamps up front.. and maybe(?) log the progress line using a different prefix? because right now the information that the group [ct/107] is finished is not really clear from the output, IMHO. the progress logging is also still broken (this is for a sync that takes a while, this is not log messages being buffered and re-ordered!): $ proxmox-backup-manager task log 'UPID:yuna:00070656:001E420F:00000002:69E= 610EB:syncjob:local\x3atest\x3atank\x3a\x3as\x2dbc01cba6\x2d805a:root@pam:'= | grep -e namespace -e 'percentage done' Syncing datastore 'test', root namespace into datastore 'tank', root namesp= ace Finished syncing root namespace, current progress: 0 groups, 0 snapshots Syncing datastore 'test', namespace 'test' into datastore 'tank', namespace= 'test' [host/exclusion-test]: percentage done: 5.26% (1/19 groups) [host/acltest]: percentage done: 5.26% (1/19 groups) [host/logtest]: percentage done: 5.26% (1/19 groups) [host/onemeg]: percentage done: 5.26% (1/19 groups) [host/fourmeg]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in group= #1) [host/symlink]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in group= #1) [host/symlink]: percentage done: 5.26% (1/19 groups) [host/fourmeg]: percentage done: 5.26% (1/19 groups) [host/format-v2-test]: percentage done: 1.75% (0/19 groups, 1/3 snapshots i= n group #1) [host/format-v2-test]: percentage done: 3.51% (0/19 groups, 2/3 snapshots i= n group #1) [host/format-v2-test]: percentage done: 5.26% (1/19 groups) [host/incrementaltest2]: percentage done: 2.63% (0/19 groups, 1/2 snapshots= in group #1) [host/incrementaltest2]: percentage done: 5.26% (1/19 groups) > [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: downloaded 1.196 Gi= B (156.892 MiB/s) > [vm/108]: 2025-09-19T07:37:19Z: sync done > ... >=20 > Signed-off-by: Christian Ebner > --- > changes since version 5: > - uses BufferedLogger implementation, refactored accordingly > - improve log line prefixes > - add missing error contexts >=20 > src/server/pull.rs | 314 +++++++++++++++++++++++++++++++++------------ > src/server/sync.rs | 8 +- > 2 files changed, 237 insertions(+), 85 deletions(-) >=20 > diff --git a/src/server/pull.rs b/src/server/pull.rs > index 611441d2a..f7aae4d59 100644 > --- a/src/server/pull.rs > +++ b/src/server/pull.rs > @@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet}; > use std::io::Seek; > use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > -use std::time::SystemTime; > +use std::time::{Duration, SystemTime}; > =20 > use anyhow::{bail, format_err, Context, Error}; > use proxmox_human_byte::HumanByte; > -use tracing::{info, warn}; > +use tracing::{info, Level}; > =20 > use pbs_api_types::{ > print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDi= r, BackupGroup, > @@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest, FileInfo}= ; > use pbs_datastore::read_chunk::AsyncReadChunk; > use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, Sto= reProgress}; > use pbs_tools::bounded_join_set::BoundedJoinSet; > +use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender}; > use pbs_tools::sha::sha256; > =20 > use super::sync::{ > @@ -153,6 +154,8 @@ async fn pull_index_chunks( > index: I, > encountered_chunks: Arc>, > backend: &DatastoreBackend, > + archive_prefix: &str, > + log_sender: Arc, > ) -> Result { > use futures::stream::{self, StreamExt, TryStreamExt}; > =20 > @@ -247,11 +250,16 @@ async fn pull_index_chunks( > let bytes =3D bytes.load(Ordering::SeqCst); > let chunk_count =3D chunk_count.load(Ordering::SeqCst); > =20 > - info!( > - "downloaded {} ({}/s)", > - HumanByte::from(bytes), > - HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), > - ); > + log_sender > + .log( > + Level::INFO, > + format!( > + "{archive_prefix}: downloaded {} ({}/s)", > + HumanByte::from(bytes), > + HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64= ()), > + ), > + ) > + .await?; > =20 > Ok(SyncStats { > chunk_count, > @@ -292,6 +300,7 @@ async fn pull_single_archive<'a>( > archive_info: &'a FileInfo, > encountered_chunks: Arc>, > backend: &DatastoreBackend, > + log_sender: Arc, > ) -> Result { > let archive_name =3D &archive_info.filename; > let mut path =3D snapshot.full_path(); > @@ -302,72 +311,104 @@ async fn pull_single_archive<'a>( > =20 > let mut sync_stats =3D SyncStats::default(); > =20 > - info!("sync archive {archive_name}"); > + let archive_prefix =3D format!("{}: {archive_name}", snapshot.backup= _time_string()); > + > + log_sender > + .log(Level::INFO, format!("{archive_prefix}: sync archive")) > + .await?; > =20 > - reader.load_file_into(archive_name, &tmp_path).await?; > + reader > + .load_file_into(archive_name, &tmp_path) > + .await > + .with_context(|| archive_prefix.clone())?; > =20 > - let mut tmpfile =3D std::fs::OpenOptions::new().read(true).open(&tmp= _path)?; > + let mut tmpfile =3D std::fs::OpenOptions::new() > + .read(true) > + .open(&tmp_path) > + .with_context(|| archive_prefix.clone())?; > =20 > match ArchiveType::from_path(archive_name)? { > ArchiveType::DynamicIndex =3D> { > let index =3D DynamicIndexReader::new(tmpfile).map_err(|err|= { > - format_err!("unable to read dynamic index {:?} - {}", tm= p_path, err) > + format_err!("{archive_prefix}: unable to read dynamic in= dex {tmp_path:?} - {err}") > })?; > let (csum, size) =3D index.compute_csum(); > - verify_archive(archive_info, &csum, size)?; > + verify_archive(archive_info, &csum, size).with_context(|| ar= chive_prefix.clone())?; > =20 > if reader.skip_chunk_sync(snapshot.datastore().name()) { > - info!("skipping chunk sync for same datastore"); > + log_sender > + .log( > + Level::INFO, > + format!("{archive_prefix}: skipping chunk sync f= or same datastore"), > + ) > + .await?; > } else { > let stats =3D pull_index_chunks( > reader > .chunk_reader(archive_info.crypt_mode) > - .context("failed to get chunk reader")?, > + .context("failed to get chunk reader") > + .with_context(|| archive_prefix.clone())?, > snapshot.datastore().clone(), > index, > encountered_chunks, > backend, > + &archive_prefix, > + Arc::clone(&log_sender), > ) > - .await?; > + .await > + .with_context(|| archive_prefix.clone())?; > sync_stats.add(stats); > } > } > ArchiveType::FixedIndex =3D> { > let index =3D FixedIndexReader::new(tmpfile).map_err(|err| { > - format_err!("unable to read fixed index '{:?}' - {}", tm= p_path, err) > + format_err!("{archive_name}: unable to read fixed index = '{tmp_path:?}' - {err}") > })?; > let (csum, size) =3D index.compute_csum(); > - verify_archive(archive_info, &csum, size)?; > + verify_archive(archive_info, &csum, size).with_context(|| ar= chive_prefix.clone())?; > =20 > if reader.skip_chunk_sync(snapshot.datastore().name()) { > - info!("skipping chunk sync for same datastore"); > + log_sender > + .log( > + Level::INFO, > + format!("{archive_prefix}: skipping chunk sync f= or same datastore"), > + ) > + .await?; > } else { > let stats =3D pull_index_chunks( > reader > .chunk_reader(archive_info.crypt_mode) > - .context("failed to get chunk reader")?, > + .context("failed to get chunk reader") > + .with_context(|| archive_prefix.clone())?, > snapshot.datastore().clone(), > index, > encountered_chunks, > backend, > + &archive_prefix, > + Arc::clone(&log_sender), > ) > - .await?; > + .await > + .with_context(|| archive_prefix.clone())?; > sync_stats.add(stats); > } > } > ArchiveType::Blob =3D> { > - tmpfile.rewind()?; > - let (csum, size) =3D sha256(&mut tmpfile)?; > - verify_archive(archive_info, &csum, size)?; > + proxmox_lang::try_block!({ > + tmpfile.rewind()?; > + let (csum, size) =3D sha256(&mut tmpfile)?; > + verify_archive(archive_info, &csum, size) > + }) > + .with_context(|| archive_prefix.clone())?; > } > } > if let Err(err) =3D std::fs::rename(&tmp_path, &path) { > - bail!("Atomic rename file {:?} failed - {}", path, err); > + bail!("{archive_prefix}: Atomic rename file {path:?} failed - {e= rr}"); > } > =20 > backend > .upload_index_to_backend(snapshot, archive_name) > - .await?; > + .await > + .with_context(|| archive_prefix.clone())?; > =20 > Ok(sync_stats) > } > @@ -388,13 +429,24 @@ async fn pull_snapshot<'a>( > encountered_chunks: Arc>, > corrupt: bool, > is_new: bool, > + log_sender: Arc, > ) -> Result { > + let prefix =3D snapshot.backup_time_string().to_owned(); > if is_new { > - info!("sync snapshot {}", snapshot.dir()); > + log_sender > + .log(Level::INFO, format!("{prefix}: start sync")) > + .await?; > } else if corrupt { > - info!("re-sync snapshot {} due to corruption", snapshot.dir()); > + log_sender > + .log( > + Level::INFO, > + format!("re-sync snapshot {prefix} due to corruption"), > + ) > + .await?; > } else { > - info!("re-sync snapshot {}", snapshot.dir()); > + log_sender > + .log(Level::INFO, format!("re-sync snapshot {prefix}")) > + .await?; > } > =20 > let mut sync_stats =3D SyncStats::default(); > @@ -409,7 +461,8 @@ async fn pull_snapshot<'a>( > let tmp_manifest_blob; > if let Some(data) =3D reader > .load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name) > - .await? > + .await > + .with_context(|| prefix.clone())? > { > tmp_manifest_blob =3D data; > } else { > @@ -419,28 +472,34 @@ async fn pull_snapshot<'a>( > if manifest_name.exists() && !corrupt { > let manifest_blob =3D proxmox_lang::try_block!({ > let mut manifest_file =3D std::fs::File::open(&manifest_name= ).map_err(|err| { > - format_err!("unable to open local manifest {manifest_nam= e:?} - {err}") > + format_err!("{prefix}: unable to open local manifest {ma= nifest_name:?} - {err}") > })?; > =20 > - let manifest_blob =3D DataBlob::load_from_reader(&mut manife= st_file)?; > + let manifest_blob =3D > + DataBlob::load_from_reader(&mut manifest_file).with_cont= ext(|| prefix.clone())?; > Ok(manifest_blob) > }) > .map_err(|err: Error| { > - format_err!("unable to read local manifest {manifest_name:?}= - {err}") > + format_err!("{prefix}: unable to read local manifest {manife= st_name:?} - {err}") > })?; > =20 > if manifest_blob.raw_data() =3D=3D tmp_manifest_blob.raw_data() = { > if !client_log_name.exists() { > - reader.try_download_client_log(&client_log_name).await?; > + reader > + .try_download_client_log(&client_log_name) > + .await > + .with_context(|| prefix.clone())?; > }; > - info!("no data changes"); > + log_sender > + .log(Level::INFO, format!("{prefix}: no data changes")) > + .await?; > let _ =3D std::fs::remove_file(&tmp_manifest_name); > return Ok(sync_stats); // nothing changed > } > } > =20 > let manifest_data =3D tmp_manifest_blob.raw_data().to_vec(); > - let manifest =3D BackupManifest::try_from(tmp_manifest_blob)?; > + let manifest =3D BackupManifest::try_from(tmp_manifest_blob).with_co= ntext(|| prefix.clone())?; > =20 > if ignore_not_verified_or_encrypted( > &manifest, > @@ -464,35 +523,54 @@ async fn pull_snapshot<'a>( > path.push(&item.filename); > =20 > if !corrupt && path.exists() { > - let filename: BackupArchiveName =3D item.filename.as_str().t= ry_into()?; > + let filename: BackupArchiveName =3D item > + .filename > + .as_str() > + .try_into() > + .with_context(|| prefix.clone())?; > match filename.archive_type() { > ArchiveType::DynamicIndex =3D> { > - let index =3D DynamicIndexReader::open(&path)?; > + let index =3D DynamicIndexReader::open(&path).with_c= ontext(|| prefix.clone())?; > let (csum, size) =3D index.compute_csum(); > match manifest.verify_file(&filename, &csum, size) { > Ok(_) =3D> continue, > Err(err) =3D> { > - info!("detected changed file {path:?} - {err= }"); > + log_sender > + .log( > + Level::INFO, > + format!("{prefix}: detected changed = file {path:?} - {err}"), > + ) > + .await?; > } > } > } > ArchiveType::FixedIndex =3D> { > - let index =3D FixedIndexReader::open(&path)?; > + let index =3D FixedIndexReader::open(&path).with_con= text(|| prefix.clone())?; > let (csum, size) =3D index.compute_csum(); > match manifest.verify_file(&filename, &csum, size) { > Ok(_) =3D> continue, > Err(err) =3D> { > - info!("detected changed file {path:?} - {err= }"); > + log_sender > + .log( > + Level::INFO, > + format!("{prefix}: detected changed = file {path:?} - {err}"), > + ) > + .await?; > } > } > } > ArchiveType::Blob =3D> { > - let mut tmpfile =3D std::fs::File::open(&path)?; > - let (csum, size) =3D sha256(&mut tmpfile)?; > + let mut tmpfile =3D std::fs::File::open(&path).with_= context(|| prefix.clone())?; > + let (csum, size) =3D sha256(&mut tmpfile).with_conte= xt(|| prefix.clone())?; > match manifest.verify_file(&filename, &csum, size) { > Ok(_) =3D> continue, > Err(err) =3D> { > - info!("detected changed file {path:?} - {err= }"); > + log_sender > + .log( > + Level::INFO, > + format!("{prefix}: detected changed = file {path:?} - {err}"), > + ) > + .await?; > } > } > } > @@ -505,13 +583,14 @@ async fn pull_snapshot<'a>( > item, > encountered_chunks.clone(), > backend, > + Arc::clone(&log_sender), > ) > .await?; > sync_stats.add(stats); > } > =20 > if let Err(err) =3D std::fs::rename(&tmp_manifest_name, &manifest_na= me) { > - bail!("Atomic rename file {:?} failed - {}", manifest_name, err)= ; > + bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {= err}"); > } > if let DatastoreBackend::S3(s3_client) =3D backend { > let object_key =3D pbs_datastore::s3::object_key_from_path( > @@ -524,33 +603,40 @@ async fn pull_snapshot<'a>( > let _is_duplicate =3D s3_client > .upload_replace_with_retry(object_key, data) > .await > - .context("failed to upload manifest to s3 backend")?; > + .context("failed to upload manifest to s3 backend") > + .with_context(|| prefix.clone())?; > } > =20 > if !client_log_name.exists() { > - reader.try_download_client_log(&client_log_name).await?; > + reader > + .try_download_client_log(&client_log_name) > + .await > + .with_context(|| prefix.clone())?; > if client_log_name.exists() { > if let DatastoreBackend::S3(s3_client) =3D backend { > let object_key =3D pbs_datastore::s3::object_key_from_pa= th( > &snapshot.relative_path(), > CLIENT_LOG_BLOB_NAME.as_ref(), > ) > - .context("invalid archive object key")?; > + .context("invalid archive object key") > + .with_context(|| prefix.clone())?; > =20 > let data =3D tokio::fs::read(&client_log_name) > .await > - .context("failed to read log file contents")?; > + .context("failed to read log file contents") > + .with_context(|| prefix.clone())?; > let contents =3D hyper::body::Bytes::from(data); > let _is_duplicate =3D s3_client > .upload_replace_with_retry(object_key, contents) > .await > - .context("failed to upload client log to s3 backend"= )?; > + .context("failed to upload client log to s3 backend"= ) > + .with_context(|| prefix.clone())?; > } > } > }; > snapshot > .cleanup_unreferenced_files(&manifest) > - .map_err(|err| format_err!("failed to cleanup unreferenced files= - {err}"))?; > + .map_err(|err| format_err!("{prefix}: failed to cleanup unrefere= nced files - {err}"))?; > =20 > Ok(sync_stats) > } > @@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>( > snapshot: &'a pbs_datastore::BackupDir, > encountered_chunks: Arc>, > corrupt: bool, > + log_sender: Arc, > ) -> Result { > + let prefix =3D format!("{}", snapshot.backup_time_string()); > + > let (_path, is_new, _snap_lock) =3D snapshot > .datastore() > - .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref(= ))?; > + .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref(= )) > + .context(prefix.clone())?; > =20 > let result =3D pull_snapshot( > params, > @@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>( > encountered_chunks, > corrupt, > is_new, > + Arc::clone(&log_sender), > ) > .await; > =20 > @@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>( > snapshot.as_ref(), > true, > ) { > - info!("cleanup error - {cleanup_err}"); > + log_sender > + .log( > + Level::INFO, > + format!("{prefix}: cleanup error - {cleanup_= err}"), > + ) > + .await?; > } > return Err(err); > } > - Ok(_) =3D> info!("sync snapshot {} done", snapshot.dir()), > + Ok(_) =3D> { > + log_sender > + .log(Level::INFO, format!("{prefix}: sync done")) > + .await? > + } > } > } > =20 > @@ -622,7 +722,9 @@ async fn pull_group( > source_namespace: &BackupNamespace, > group: &BackupGroup, > shared_group_progress: Arc, > + log_sender: Arc, > ) -> Result { > + let prefix =3D format!("{group}"); > let mut already_synced_skip_info =3D SkipInfo::new(SkipReason::Alrea= dySynced); > let mut transfer_last_skip_info =3D SkipInfo::new(SkipReason::Transf= erLast); > =20 > @@ -714,11 +816,15 @@ async fn pull_group( > .collect(); > =20 > if already_synced_skip_info.count > 0 { > - info!("{already_synced_skip_info}"); > + log_sender > + .log(Level::INFO, format!("{prefix}: {already_synced_skip_in= fo}")) > + .await?; > already_synced_skip_info.reset(); > } > if transfer_last_skip_info.count > 0 { > - info!("{transfer_last_skip_info}"); > + log_sender > + .log(Level::INFO, format!("{prefix}: {transfer_last_skip_inf= o}")) > + .await?; > transfer_last_skip_info.reset(); > } > =20 > @@ -730,8 +836,8 @@ async fn pull_group( > .store > .backup_group(target_ns.clone(), group.clone()); > if let Some(info) =3D backup_group.last_backup(true).unwrap_or(None)= { > - let mut reusable_chunks =3D encountered_chunks.lock().unwrap(); > if let Err(err) =3D proxmox_lang::try_block!({ > + let mut reusable_chunks =3D encountered_chunks.lock().unwrap= (); > let _snapshot_guard =3D info > .backup_dir > .lock_shared() > @@ -780,7 +886,12 @@ async fn pull_group( > } > Ok::<(), Error>(()) > }) { > - warn!("Failed to collect reusable chunk from last backup: {e= rr:#?}"); > + log_sender > + .log( > + Level::WARN, > + format!("Failed to collect reusable chunk from last = backup: {err:#?}"), > + ) > + .await?; > } > } > =20 > @@ -805,13 +916,16 @@ async fn pull_group( > &to_snapshot, > encountered_chunks.clone(), > corrupt, > + Arc::clone(&log_sender), > ) > .await; > =20 > // Update done groups progress by other parallel running pulls > local_progress.done_groups =3D shared_group_progress.load_done()= ; > local_progress.done_snapshots =3D pos as u64 + 1; > - info!("percentage done: group {group}: {local_progress}"); > + log_sender > + .log(Level::INFO, format!("percentage done: {local_progress}= ")) > + .await?; > =20 > let stats =3D result?; // stop on error > sync_stats.add(stats); > @@ -829,13 +943,23 @@ async fn pull_group( > continue; > } > if snapshot.is_protected() { > - info!( > - "don't delete vanished snapshot {} (protected)", > - snapshot.dir() > - ); > + log_sender > + .log( > + Level::INFO, > + format!( > + "{prefix}: don't delete vanished snapshot {}= (protected)", > + snapshot.dir(), > + ), > + ) > + .await?; > continue; > } > - info!("delete vanished snapshot {}", snapshot.dir()); > + log_sender > + .log( > + Level::INFO, > + format!("delete vanished snapshot {}", snapshot.dir(= )), > + ) > + .await?; > params > .target > .store > @@ -1035,10 +1159,7 @@ pub(crate) async fn pull_store(mut params: PullPar= ameters) -> Result } > Err(err) =3D> { > errors =3D true; > - info!( > - "Encountered errors while syncing namespace {} - {er= r}", > - &namespace, > - ); > + info!("Encountered errors while syncing namespace {names= pace} - {err}"); > } > }; > } > @@ -1064,6 +1185,7 @@ async fn lock_and_pull_group( > namespace: &BackupNamespace, > target_namespace: &BackupNamespace, > shared_group_progress: Arc, > + log_sender: Arc, > ) -> Result { > let (owner, _lock_guard) =3D > match params > @@ -1073,25 +1195,47 @@ async fn lock_and_pull_group( > { > Ok(res) =3D> res, > Err(err) =3D> { > - info!("sync group {group} failed - group lock failed: {e= rr}"); > - info!("create_locked_backup_group failed"); > + log_sender > + .log( > + Level::INFO, > + format!("sync group {group} failed - group lock = failed: {err}"), > + ) > + .await?; > + log_sender > + .log(Level::INFO, "create_locked_backup_group failed= ".to_string()) > + .await?; > return Err(err); > } > }; > =20 > if params.owner !=3D owner { > // only the owner is allowed to create additional snapshots > - info!( > - "sync group {group} failed - owner check failed ({} !=3D {ow= ner})", > - params.owner > - ); > + log_sender > + .log( > + Level::INFO, > + format!( > + "sync group {group} failed - owner check failed ({} = !=3D {owner})", > + params.owner, > + ), > + ) > + .await?; > return Err(format_err!("owner check failed")); > } > =20 > - match pull_group(params, namespace, group, shared_group_progress).aw= ait { > + match pull_group( > + params, > + namespace, > + group, > + shared_group_progress, > + Arc::clone(&log_sender), > + ) > + .await > + { > Ok(stats) =3D> Ok(stats), > Err(err) =3D> { > - info!("sync group {group} failed - {err:#}"); > + log_sender > + .log(Level::INFO, format!("sync group {group} failed - {= err:#}")) > + .await?; > Err(err) > } > } > @@ -1124,7 +1268,7 @@ async fn pull_ns( > list.sort_unstable(); > =20 > info!( > - "found {} groups to sync (out of {unfiltered_count} total)", > + "Found {} groups to sync (out of {unfiltered_count} total)", > list.len() > ); > =20 > @@ -1143,6 +1287,10 @@ async fn pull_ns( > let shared_group_progress =3D Arc::new(SharedGroupProgress::with_tot= al_groups(list.len())); > let mut group_workers =3D BoundedJoinSet::new(params.worker_threads.= unwrap_or(1)); > =20 > + let (buffered_logger, sender_builder) =3D BufferedLogger::new(5, Dur= ation::from_secs(1)); > + // runs until sender_builder and all senders build from it are being= dropped > + buffered_logger.run_log_collection(); > + > let mut process_results =3D |results| { > for result in results { > match result { > @@ -1160,16 +1308,20 @@ async fn pull_ns( > let target_ns =3D target_ns.clone(); > let params =3D Arc::clone(¶ms); > let group_progress_cloned =3D Arc::clone(&shared_group_progress)= ; > + let log_sender =3D Arc::new(sender_builder.sender_with_label(gro= up.to_string())); > let results =3D group_workers > .spawn_task(async move { > - lock_and_pull_group( > + let result =3D lock_and_pull_group( > Arc::clone(¶ms), > &group, > &namespace, > &target_ns, > group_progress_cloned, > + Arc::clone(&log_sender), > ) > - .await > + .await; > + let _ =3D log_sender.flush().await; > + result > }) > .await > .map_err(|err| format_err!("failed to join on worker task: {= err:#}"))?; > @@ -1197,7 +1349,7 @@ async fn pull_ns( > if !local_group.apply_filters(¶ms.group_filter) { > continue; > } > - info!("delete vanished group '{local_group}'"); > + info!("Delete vanished group '{local_group}'"); > let delete_stats_result =3D params > .target > .store > @@ -1206,7 +1358,7 @@ async fn pull_ns( > match delete_stats_result { > Ok(stats) =3D> { > if !stats.all_removed() { > - info!("kept some protected snapshots of grou= p '{local_group}'"); > + info!("Kept some protected snapshots of grou= p '{local_group}'"); > sync_stats.add(SyncStats::from(RemovedVanish= edStats { > snapshots: stats.removed_snapshots(), > groups: 0, > @@ -1229,7 +1381,7 @@ async fn pull_ns( > Ok(()) > }); > if let Err(err) =3D result { > - info!("error during cleanup: {err}"); > + info!("Error during cleanup: {err}"); > errors =3D true; > }; > } > diff --git a/src/server/sync.rs b/src/server/sync.rs > index e88418442..17ed4839f 100644 > --- a/src/server/sync.rs > +++ b/src/server/sync.rs > @@ -13,7 +13,6 @@ use futures::{future::FutureExt, select}; > use hyper::http::StatusCode; > use pbs_config::BackupLockGuard; > use serde_json::json; > -use tokio::task::JoinSet; > use tracing::{info, warn}; > =20 > use proxmox_human_byte::HumanByte; > @@ -136,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader { > Some(HttpError { code, message }) =3D> match *code { > StatusCode::NOT_FOUND =3D> { > info!( > - "skipping snapshot {} - vanished since start= of sync", > + "Snapshot {}: skipped because vanished since= start of sync", > &self.dir > ); > return Ok(None); > } > _ =3D> { > - bail!("HTTP error {code} - {message}"); > + bail!("Snapshot {}: HTTP error {code} - {message= }", &self.dir); > } > }, > None =3D> { > @@ -176,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader { > bail!("Atomic rename file {to_path:?} failed - {err}"); > } > info!( > - "got backup log file {client_log_name}", > + "Snapshot {snapshot}: got backup log file {client_log_na= me}", > + snapshot =3D &self.dir, > client_log_name =3D client_log_name.deref() > ); > } > --=20 > 2.47.3 >=20 >=20 >=20 >=20 >=20 >=20