From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: Re: [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context
Date: Tue, 21 Apr 2026 09:42:23 +0200 [thread overview]
Message-ID: <7f8852e8-c092-4e2d-a031-192c087fb364@proxmox.com> (raw)
In-Reply-To: <295ee353-2731-474b-aad0-d1f8a2bcb443@proxmox.com>
On 4/21/26 9:21 AM, Christian Ebner wrote:
> On 4/20/26 1:54 PM, Fabian Grünbichler wrote:
>> On April 17, 2026 11:26 am, Christian Ebner wrote:
>>> Pulling groups and therefore also snapshots in parallel leads to
>>> unordered log outputs, making it mostly impossible to relate a log
>>> message to a backup snapshot/group.
>>>
>>> Therefore, prefix pull job log messages by the corresponding group or
>>> snapshot and set the error context accordingly.
>>>
>>> Also, reword some messages, inline variables in format strings and
>>> start log lines with capital letters to get consistent output.
>>>
>>> By using the buffered logger implementation and buffer up to 5 lines
>>> with a timeout of 1 second, subsequent log lines arriving in fast
>>> succession are kept together, reducing the mixing of lines.
>>>
>>> Example output for a sequential pull job:
>>> ```
>>> ...
>>> [ct/100]: 2025-11-17T10:11:42Z: start sync
>>> [ct/100]: 2025-11-17T10:11:42Z: pct.conf.blob: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.ppxar.didx: downloaded 16.785
>>> MiB (280.791 MiB/s)
>>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: sync archive
>>> [ct/100]: 2025-11-17T10:11:42Z: root.mpxar.didx: downloaded 65.703
>>> KiB (29.1 MiB/s)
>>> [ct/100]: 2025-11-17T10:11:42Z: sync done
>>> [ct/100]: percentage done: 9.09% (1/11 groups)
>>> [ct/101]: 2026-03-31T12:20:16Z: start sync
>>> [ct/101]: 2026-03-31T12:20:16Z: pct.conf.blob: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: root.pxar.didx: downloaded 199.806
>>> MiB (311.91 MiB/s)
>>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: sync archive
>>> [ct/101]: 2026-03-31T12:20:16Z: catalog.pcat1.didx: downloaded
>>> 180.379 KiB (22.748 MiB/s)
>>> [ct/101]: 2026-03-31T12:20:16Z: sync done
>>
>> this is probably the wrong patch for this comment, but since you have
>> the sample output here ;)
>>
>> should we pad the labels? the buffered logger knows all currently active
>> labels and could adapt it? otherwise especially for host backups the
>> logs are again not really scannable by humans, because there will be
>> weird jumps in alignment.. or (.. continued below)
>
> That will not work though? While the logger knows about the labels when
> receiving them, it does not know at all about future ones that might
> still come in. So it can happen that the padding does not fit anymore,
> and one gets even uglier formatting issues.
>
> I would prefer to keep them as is for the time being. For vm/ct it is
> not that bad, one could maybe forsee the ID to be a 4 digit number and
> define a minimum label lenght to be padded if not reached.
>
> Host backups or backups with explicit label
>
>
>>> ...
>>> ```
>>>
>>> Example output for a parallel pull job:
>>> ```
>>> ...
>>> [ct/107]: 2025-07-16T09:14:01Z: start sync
>>> [ct/107]: 2025-07-16T09:14:01Z: pct.conf.blob: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: sync archive
>>> [vm/108]: 2025-09-19T07:37:19Z: start sync
>>> [vm/108]: 2025-09-19T07:37:19Z: qemu-server.conf.blob: sync archive
>>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.ppxar.didx: downloaded 609.233
>>> MiB (112.628 MiB/s)
>>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: sync archive
>>> [ct/107]: 2025-07-16T09:14:01Z: root.mpxar.didx: downloaded 1.172 MiB
>>> (17.838 MiB/s)
>>> [ct/107]: 2025-07-16T09:14:01Z: sync done
>>> [ct/107]: percentage done: 72.73% (8/11 groups)
>>
>> the way the prefix and snapshot are formatted could also be interpreted
>> at first glance as a timestamp of the log line.. why not just prepend
>> the prefix on the logger side, and leave it up to the caller to do the
>> formatting? then we could us "{type}/{id}/" as prefix here? or add
>> 'snapshot' in those lines to make it clear? granted, this is more of an
>> issue when viewing the log via `proxmox-backup-manager`, as in the UI we
>> have the log timestamps up front..
>
> I simply followed along the line of your suggestion here. In prior
> versions I had exactly that but you rejected it as to verbose? :)
>
> https://lore.proxmox.com/pbs-devel/1774263381.bngcrer2th.astroid@yuna.none/
>
>>
>> and maybe(?) log the progress line using a different prefix? because
>> right now the information that the group [ct/107] is finished is not
>> really clear from the output, IMHO.
>
> That I can add, yes.
>
>>
>> the progress logging is also still broken (this is for a sync that takes
>> a while, this is not log messages being buffered and re-ordered!):
>>
>> $ proxmox-backup-manager task log
>> 'UPID:yuna:00070656:001E420F:00000002:69E610EB:syncjob:local\x3atest\x3atank\x3a\x3as\x2dbc01cba6\x2d805a:root@pam:' | grep -e namespace -e 'percentage done'
>> Syncing datastore 'test', root namespace into datastore 'tank', root
>> namespace
>> Finished syncing root namespace, current progress: 0 groups, 0 snapshots
>> Syncing datastore 'test', namespace 'test' into datastore 'tank',
>> namespace 'test'
>> [host/exclusion-test]: percentage done: 5.26% (1/19 groups)
>> [host/acltest]: percentage done: 5.26% (1/19 groups)
>> [host/logtest]: percentage done: 5.26% (1/19 groups)
>> [host/onemeg]: percentage done: 5.26% (1/19 groups)
>> [host/fourmeg]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in
>> group #1)
>> [host/symlink]: percentage done: 2.63% (0/19 groups, 1/2 snapshots in
>> group #1)
>> [host/symlink]: percentage done: 5.26% (1/19 groups)
>> [host/fourmeg]: percentage done: 5.26% (1/19 groups)
>> [host/format-v2-test]: percentage done: 1.75% (0/19 groups, 1/3
>> snapshots in group #1)
>> [host/format-v2-test]: percentage done: 3.51% (0/19 groups, 2/3
>> snapshots in group #1)
>> [host/format-v2-test]: percentage done: 5.26% (1/19 groups)
>> [host/incrementaltest2]: percentage done: 2.63% (0/19 groups, 1/2
>> snapshots in group #1)
>> [host/incrementaltest2]: percentage done: 5.26% (1/19 groups)
>
> There will always be cases where it does not work I guess, not sure what
> I can do to make you happy here.
Oh, sorry, might have been to quick on this one. You are saying this
happens for a non-parallel sync?
>>
>>> [vm/108]: 2025-09-19T07:37:19Z: drive-scsi0.img.fidx: downloaded
>>> 1.196 GiB (156.892 MiB/s)
>>> [vm/108]: 2025-09-19T07:37:19Z: sync done
>>> ...
>>>
>>> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>>> ---
>>> changes since version 5:
>>> - uses BufferedLogger implementation, refactored accordingly
>>> - improve log line prefixes
>>> - add missing error contexts
>>>
>>> src/server/pull.rs | 314 +++++++++++++++++++++++++++++++++------------
>>> src/server/sync.rs | 8 +-
>>> 2 files changed, 237 insertions(+), 85 deletions(-)
>>>
>>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>>> index 611441d2a..f7aae4d59 100644
>>> --- a/src/server/pull.rs
>>> +++ b/src/server/pull.rs
>>> @@ -5,11 +5,11 @@ use std::collections::{HashMap, HashSet};
>>> use std::io::Seek;
>>> use std::sync::atomic::{AtomicUsize, Ordering};
>>> use std::sync::{Arc, Mutex};
>>> -use std::time::SystemTime;
>>> +use std::time::{Duration, SystemTime};
>>> use anyhow::{bail, format_err, Context, Error};
>>> use proxmox_human_byte::HumanByte;
>>> -use tracing::{info, warn};
>>> +use tracing::{info, Level};
>>> use pbs_api_types::{
>>> print_store_and_ns, ArchiveType, Authid, BackupArchiveName,
>>> BackupDir, BackupGroup,
>>> @@ -27,6 +27,7 @@ use pbs_datastore::manifest::{BackupManifest,
>>> FileInfo};
>>> use pbs_datastore::read_chunk::AsyncReadChunk;
>>> use pbs_datastore::{check_backup_owner, DataStore,
>>> DatastoreBackend, StoreProgress};
>>> use pbs_tools::bounded_join_set::BoundedJoinSet;
>>> +use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
>>> use pbs_tools::sha::sha256;
>>> use super::sync::{
>>> @@ -153,6 +154,8 @@ async fn pull_index_chunks<I: IndexFile>(
>>> index: I,
>>> encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>> backend: &DatastoreBackend,
>>> + archive_prefix: &str,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> use futures::stream::{self, StreamExt, TryStreamExt};
>>> @@ -247,11 +250,16 @@ async fn pull_index_chunks<I: IndexFile>(
>>> let bytes = bytes.load(Ordering::SeqCst);
>>> let chunk_count = chunk_count.load(Ordering::SeqCst);
>>> - info!(
>>> - "downloaded {} ({}/s)",
>>> - HumanByte::from(bytes),
>>> - HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
>>> - );
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!(
>>> + "{archive_prefix}: downloaded {} ({}/s)",
>>> + HumanByte::from(bytes),
>>> + HumanByte::new_binary(bytes as f64 /
>>> elapsed.as_secs_f64()),
>>> + ),
>>> + )
>>> + .await?;
>>> Ok(SyncStats {
>>> chunk_count,
>>> @@ -292,6 +300,7 @@ async fn pull_single_archive<'a>(
>>> archive_info: &'a FileInfo,
>>> encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>> backend: &DatastoreBackend,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> let archive_name = &archive_info.filename;
>>> let mut path = snapshot.full_path();
>>> @@ -302,72 +311,104 @@ async fn pull_single_archive<'a>(
>>> let mut sync_stats = SyncStats::default();
>>> - info!("sync archive {archive_name}");
>>> + let archive_prefix = format!("{}: {archive_name}",
>>> snapshot.backup_time_string());
>>> +
>>> + log_sender
>>> + .log(Level::INFO, format!("{archive_prefix}: sync archive"))
>>> + .await?;
>>> - reader.load_file_into(archive_name, &tmp_path).await?;
>>> + reader
>>> + .load_file_into(archive_name, &tmp_path)
>>> + .await
>>> + .with_context(|| archive_prefix.clone())?;
>>> - let mut tmpfile =
>>> std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>>> + let mut tmpfile = std::fs::OpenOptions::new()
>>> + .read(true)
>>> + .open(&tmp_path)
>>> + .with_context(|| archive_prefix.clone())?;
>>> match ArchiveType::from_path(archive_name)? {
>>> ArchiveType::DynamicIndex => {
>>> let index = DynamicIndexReader::new(tmpfile).map_err(|
>>> err| {
>>> - format_err!("unable to read dynamic index {:?} -
>>> {}", tmp_path, err)
>>> + format_err!("{archive_prefix}: unable to read
>>> dynamic index {tmp_path:?} - {err}")
>>> })?;
>>> let (csum, size) = index.compute_csum();
>>> - verify_archive(archive_info, &csum, size)?;
>>> + verify_archive(archive_info, &csum,
>>> size).with_context(|| archive_prefix.clone())?;
>>> if reader.skip_chunk_sync(snapshot.datastore().name()) {
>>> - info!("skipping chunk sync for same datastore");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{archive_prefix}: skipping chunk
>>> sync for same datastore"),
>>> + )
>>> + .await?;
>>> } else {
>>> let stats = pull_index_chunks(
>>> reader
>>> .chunk_reader(archive_info.crypt_mode)
>>> - .context("failed to get chunk reader")?,
>>> + .context("failed to get chunk reader")
>>> + .with_context(|| archive_prefix.clone())?,
>>> snapshot.datastore().clone(),
>>> index,
>>> encountered_chunks,
>>> backend,
>>> + &archive_prefix,
>>> + Arc::clone(&log_sender),
>>> )
>>> - .await?;
>>> + .await
>>> + .with_context(|| archive_prefix.clone())?;
>>> sync_stats.add(stats);
>>> }
>>> }
>>> ArchiveType::FixedIndex => {
>>> let index = FixedIndexReader::new(tmpfile).map_err(|err| {
>>> - format_err!("unable to read fixed index '{:?}' -
>>> {}", tmp_path, err)
>>> + format_err!("{archive_name}: unable to read fixed
>>> index '{tmp_path:?}' - {err}")
>>> })?;
>>> let (csum, size) = index.compute_csum();
>>> - verify_archive(archive_info, &csum, size)?;
>>> + verify_archive(archive_info, &csum,
>>> size).with_context(|| archive_prefix.clone())?;
>>> if reader.skip_chunk_sync(snapshot.datastore().name()) {
>>> - info!("skipping chunk sync for same datastore");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{archive_prefix}: skipping chunk
>>> sync for same datastore"),
>>> + )
>>> + .await?;
>>> } else {
>>> let stats = pull_index_chunks(
>>> reader
>>> .chunk_reader(archive_info.crypt_mode)
>>> - .context("failed to get chunk reader")?,
>>> + .context("failed to get chunk reader")
>>> + .with_context(|| archive_prefix.clone())?,
>>> snapshot.datastore().clone(),
>>> index,
>>> encountered_chunks,
>>> backend,
>>> + &archive_prefix,
>>> + Arc::clone(&log_sender),
>>> )
>>> - .await?;
>>> + .await
>>> + .with_context(|| archive_prefix.clone())?;
>>> sync_stats.add(stats);
>>> }
>>> }
>>> ArchiveType::Blob => {
>>> - tmpfile.rewind()?;
>>> - let (csum, size) = sha256(&mut tmpfile)?;
>>> - verify_archive(archive_info, &csum, size)?;
>>> + proxmox_lang::try_block!({
>>> + tmpfile.rewind()?;
>>> + let (csum, size) = sha256(&mut tmpfile)?;
>>> + verify_archive(archive_info, &csum, size)
>>> + })
>>> + .with_context(|| archive_prefix.clone())?;
>>> }
>>> }
>>> if let Err(err) = std::fs::rename(&tmp_path, &path) {
>>> - bail!("Atomic rename file {:?} failed - {}", path, err);
>>> + bail!("{archive_prefix}: Atomic rename file {path:?} failed
>>> - {err}");
>>> }
>>> backend
>>> .upload_index_to_backend(snapshot, archive_name)
>>> - .await?;
>>> + .await
>>> + .with_context(|| archive_prefix.clone())?;
>>> Ok(sync_stats)
>>> }
>>> @@ -388,13 +429,24 @@ async fn pull_snapshot<'a>(
>>> encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>> corrupt: bool,
>>> is_new: bool,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> + let prefix = snapshot.backup_time_string().to_owned();
>>> if is_new {
>>> - info!("sync snapshot {}", snapshot.dir());
>>> + log_sender
>>> + .log(Level::INFO, format!("{prefix}: start sync"))
>>> + .await?;
>>> } else if corrupt {
>>> - info!("re-sync snapshot {} due to corruption", snapshot.dir());
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("re-sync snapshot {prefix} due to corruption"),
>>> + )
>>> + .await?;
>>> } else {
>>> - info!("re-sync snapshot {}", snapshot.dir());
>>> + log_sender
>>> + .log(Level::INFO, format!("re-sync snapshot {prefix}"))
>>> + .await?;
>>> }
>>> let mut sync_stats = SyncStats::default();
>>> @@ -409,7 +461,8 @@ async fn pull_snapshot<'a>(
>>> let tmp_manifest_blob;
>>> if let Some(data) = reader
>>> .load_file_into(MANIFEST_BLOB_NAME.as_ref(),
>>> &tmp_manifest_name)
>>> - .await?
>>> + .await
>>> + .with_context(|| prefix.clone())?
>>> {
>>> tmp_manifest_blob = data;
>>> } else {
>>> @@ -419,28 +472,34 @@ async fn pull_snapshot<'a>(
>>> if manifest_name.exists() && !corrupt {
>>> let manifest_blob = proxmox_lang::try_block!({
>>> let mut manifest_file =
>>> std::fs::File::open(&manifest_name).map_err(|err| {
>>> - format_err!("unable to open local manifest
>>> {manifest_name:?} - {err}")
>>> + format_err!("{prefix}: unable to open local manifest
>>> {manifest_name:?} - {err}")
>>> })?;
>>> - let manifest_blob = DataBlob::load_from_reader(&mut
>>> manifest_file)?;
>>> + let manifest_blob =
>>> + DataBlob::load_from_reader(&mut
>>> manifest_file).with_context(|| prefix.clone())?;
>>> Ok(manifest_blob)
>>> })
>>> .map_err(|err: Error| {
>>> - format_err!("unable to read local manifest
>>> {manifest_name:?} - {err}")
>>> + format_err!("{prefix}: unable to read local manifest
>>> {manifest_name:?} - {err}")
>>> })?;
>>> if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>>> if !client_log_name.exists() {
>>> -
>>> reader.try_download_client_log(&client_log_name).await?;
>>> + reader
>>> + .try_download_client_log(&client_log_name)
>>> + .await
>>> + .with_context(|| prefix.clone())?;
>>> };
>>> - info!("no data changes");
>>> + log_sender
>>> + .log(Level::INFO, format!("{prefix}: no data changes"))
>>> + .await?;
>>> let _ = std::fs::remove_file(&tmp_manifest_name);
>>> return Ok(sync_stats); // nothing changed
>>> }
>>> }
>>> let manifest_data = tmp_manifest_blob.raw_data().to_vec();
>>> - let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>>> + let manifest =
>>> BackupManifest::try_from(tmp_manifest_blob).with_context(||
>>> prefix.clone())?;
>>> if ignore_not_verified_or_encrypted(
>>> &manifest,
>>> @@ -464,35 +523,54 @@ async fn pull_snapshot<'a>(
>>> path.push(&item.filename);
>>> if !corrupt && path.exists() {
>>> - let filename: BackupArchiveName =
>>> item.filename.as_str().try_into()?;
>>> + let filename: BackupArchiveName = item
>>> + .filename
>>> + .as_str()
>>> + .try_into()
>>> + .with_context(|| prefix.clone())?;
>>> match filename.archive_type() {
>>> ArchiveType::DynamicIndex => {
>>> - let index = DynamicIndexReader::open(&path)?;
>>> + let index =
>>> DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
>>> let (csum, size) = index.compute_csum();
>>> match manifest.verify_file(&filename, &csum,
>>> size) {
>>> Ok(_) => continue,
>>> Err(err) => {
>>> - info!("detected changed file {path:?} -
>>> {err}");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{prefix}: detected
>>> changed file {path:?} - {err}"),
>>> + )
>>> + .await?;
>>> }
>>> }
>>> }
>>> ArchiveType::FixedIndex => {
>>> - let index = FixedIndexReader::open(&path)?;
>>> + let index =
>>> FixedIndexReader::open(&path).with_context(|| prefix.clone())?;
>>> let (csum, size) = index.compute_csum();
>>> match manifest.verify_file(&filename, &csum,
>>> size) {
>>> Ok(_) => continue,
>>> Err(err) => {
>>> - info!("detected changed file {path:?} -
>>> {err}");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{prefix}: detected
>>> changed file {path:?} - {err}"),
>>> + )
>>> + .await?;
>>> }
>>> }
>>> }
>>> ArchiveType::Blob => {
>>> - let mut tmpfile = std::fs::File::open(&path)?;
>>> - let (csum, size) = sha256(&mut tmpfile)?;
>>> + let mut tmpfile =
>>> std::fs::File::open(&path).with_context(|| prefix.clone())?;
>>> + let (csum, size) = sha256(&mut
>>> tmpfile).with_context(|| prefix.clone())?;
>>> match manifest.verify_file(&filename, &csum,
>>> size) {
>>> Ok(_) => continue,
>>> Err(err) => {
>>> - info!("detected changed file {path:?} -
>>> {err}");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{prefix}: detected
>>> changed file {path:?} - {err}"),
>>> + )
>>> + .await?;
>>> }
>>> }
>>> }
>>> @@ -505,13 +583,14 @@ async fn pull_snapshot<'a>(
>>> item,
>>> encountered_chunks.clone(),
>>> backend,
>>> + Arc::clone(&log_sender),
>>> )
>>> .await?;
>>> sync_stats.add(stats);
>>> }
>>> if let Err(err) = std::fs::rename(&tmp_manifest_name,
>>> &manifest_name) {
>>> - bail!("Atomic rename file {:?} failed - {}", manifest_name,
>>> err);
>>> + bail!("{prefix}: Atomic rename file {manifest_name:?} failed
>>> - {err}");
>>> }
>>> if let DatastoreBackend::S3(s3_client) = backend {
>>> let object_key = pbs_datastore::s3::object_key_from_path(
>>> @@ -524,33 +603,40 @@ async fn pull_snapshot<'a>(
>>> let _is_duplicate = s3_client
>>> .upload_replace_with_retry(object_key, data)
>>> .await
>>> - .context("failed to upload manifest to s3 backend")?;
>>> + .context("failed to upload manifest to s3 backend")
>>> + .with_context(|| prefix.clone())?;
>>> }
>>> if !client_log_name.exists() {
>>> - reader.try_download_client_log(&client_log_name).await?;
>>> + reader
>>> + .try_download_client_log(&client_log_name)
>>> + .await
>>> + .with_context(|| prefix.clone())?;
>>> if client_log_name.exists() {
>>> if let DatastoreBackend::S3(s3_client) = backend {
>>> let object_key =
>>> pbs_datastore::s3::object_key_from_path(
>>> &snapshot.relative_path(),
>>> CLIENT_LOG_BLOB_NAME.as_ref(),
>>> )
>>> - .context("invalid archive object key")?;
>>> + .context("invalid archive object key")
>>> + .with_context(|| prefix.clone())?;
>>> let data = tokio::fs::read(&client_log_name)
>>> .await
>>> - .context("failed to read log file contents")?;
>>> + .context("failed to read log file contents")
>>> + .with_context(|| prefix.clone())?;
>>> let contents = hyper::body::Bytes::from(data);
>>> let _is_duplicate = s3_client
>>> .upload_replace_with_retry(object_key, contents)
>>> .await
>>> - .context("failed to upload client log to s3
>>> backend")?;
>>> + .context("failed to upload client log to s3
>>> backend")
>>> + .with_context(|| prefix.clone())?;
>>> }
>>> }
>>> };
>>> snapshot
>>> .cleanup_unreferenced_files(&manifest)
>>> - .map_err(|err| format_err!("failed to cleanup unreferenced
>>> files - {err}"))?;
>>> + .map_err(|err| format_err!("{prefix}: failed to cleanup
>>> unreferenced files - {err}"))?;
>>> Ok(sync_stats)
>>> }
>>> @@ -565,10 +651,14 @@ async fn pull_snapshot_from<'a>(
>>> snapshot: &'a pbs_datastore::BackupDir,
>>> encountered_chunks: Arc<Mutex<EncounteredChunks>>,
>>> corrupt: bool,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> + let prefix = format!("{}", snapshot.backup_time_string());
>>> +
>>> let (_path, is_new, _snap_lock) = snapshot
>>> .datastore()
>>> - .create_locked_backup_dir(snapshot.backup_ns(),
>>> snapshot.as_ref())?;
>>> + .create_locked_backup_dir(snapshot.backup_ns(),
>>> snapshot.as_ref())
>>> + .context(prefix.clone())?;
>>> let result = pull_snapshot(
>>> params,
>>> @@ -577,6 +667,7 @@ async fn pull_snapshot_from<'a>(
>>> encountered_chunks,
>>> corrupt,
>>> is_new,
>>> + Arc::clone(&log_sender),
>>> )
>>> .await;
>>> @@ -589,11 +680,20 @@ async fn pull_snapshot_from<'a>(
>>> snapshot.as_ref(),
>>> true,
>>> ) {
>>> - info!("cleanup error - {cleanup_err}");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("{prefix}: cleanup error -
>>> {cleanup_err}"),
>>> + )
>>> + .await?;
>>> }
>>> return Err(err);
>>> }
>>> - Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
>>> + Ok(_) => {
>>> + log_sender
>>> + .log(Level::INFO, format!("{prefix}: sync done"))
>>> + .await?
>>> + }
>>> }
>>> }
>>> @@ -622,7 +722,9 @@ async fn pull_group(
>>> source_namespace: &BackupNamespace,
>>> group: &BackupGroup,
>>> shared_group_progress: Arc<SharedGroupProgress>,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> + let prefix = format!("{group}");
>>> let mut already_synced_skip_info =
>>> SkipInfo::new(SkipReason::AlreadySynced);
>>> let mut transfer_last_skip_info =
>>> SkipInfo::new(SkipReason::TransferLast);
>>> @@ -714,11 +816,15 @@ async fn pull_group(
>>> .collect();
>>> if already_synced_skip_info.count > 0 {
>>> - info!("{already_synced_skip_info}");
>>> + log_sender
>>> + .log(Level::INFO, format!("{prefix}:
>>> {already_synced_skip_info}"))
>>> + .await?;
>>> already_synced_skip_info.reset();
>>> }
>>> if transfer_last_skip_info.count > 0 {
>>> - info!("{transfer_last_skip_info}");
>>> + log_sender
>>> + .log(Level::INFO, format!("{prefix}:
>>> {transfer_last_skip_info}"))
>>> + .await?;
>>> transfer_last_skip_info.reset();
>>> }
>>> @@ -730,8 +836,8 @@ async fn pull_group(
>>> .store
>>> .backup_group(target_ns.clone(), group.clone());
>>> if let Some(info) =
>>> backup_group.last_backup(true).unwrap_or(None) {
>>> - let mut reusable_chunks = encountered_chunks.lock().unwrap();
>>> if let Err(err) = proxmox_lang::try_block!({
>>> + let mut reusable_chunks =
>>> encountered_chunks.lock().unwrap();
>>> let _snapshot_guard = info
>>> .backup_dir
>>> .lock_shared()
>>> @@ -780,7 +886,12 @@ async fn pull_group(
>>> }
>>> Ok::<(), Error>(())
>>> }) {
>>> - warn!("Failed to collect reusable chunk from last
>>> backup: {err:#?}");
>>> + log_sender
>>> + .log(
>>> + Level::WARN,
>>> + format!("Failed to collect reusable chunk from
>>> last backup: {err:#?}"),
>>> + )
>>> + .await?;
>>> }
>>> }
>>> @@ -805,13 +916,16 @@ async fn pull_group(
>>> &to_snapshot,
>>> encountered_chunks.clone(),
>>> corrupt,
>>> + Arc::clone(&log_sender),
>>> )
>>> .await;
>>> // Update done groups progress by other parallel running pulls
>>> local_progress.done_groups =
>>> shared_group_progress.load_done();
>>> local_progress.done_snapshots = pos as u64 + 1;
>>> - info!("percentage done: group {group}: {local_progress}");
>>> + log_sender
>>> + .log(Level::INFO, format!("percentage done:
>>> {local_progress}"))
>>> + .await?;
>>> let stats = result?; // stop on error
>>> sync_stats.add(stats);
>>> @@ -829,13 +943,23 @@ async fn pull_group(
>>> continue;
>>> }
>>> if snapshot.is_protected() {
>>> - info!(
>>> - "don't delete vanished snapshot {} (protected)",
>>> - snapshot.dir()
>>> - );
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!(
>>> + "{prefix}: don't delete vanished
>>> snapshot {} (protected)",
>>> + snapshot.dir(),
>>> + ),
>>> + )
>>> + .await?;
>>> continue;
>>> }
>>> - info!("delete vanished snapshot {}", snapshot.dir());
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("delete vanished snapshot {}",
>>> snapshot.dir()),
>>> + )
>>> + .await?;
>>> params
>>> .target
>>> .store
>>> @@ -1035,10 +1159,7 @@ pub(crate) async fn pull_store(mut params:
>>> PullParameters) -> Result<SyncStats,
>>> }
>>> Err(err) => {
>>> errors = true;
>>> - info!(
>>> - "Encountered errors while syncing namespace {} -
>>> {err}",
>>> - &namespace,
>>> - );
>>> + info!("Encountered errors while syncing namespace
>>> {namespace} - {err}");
>>> }
>>> };
>>> }
>>> @@ -1064,6 +1185,7 @@ async fn lock_and_pull_group(
>>> namespace: &BackupNamespace,
>>> target_namespace: &BackupNamespace,
>>> shared_group_progress: Arc<SharedGroupProgress>,
>>> + log_sender: Arc<LogLineSender>,
>>> ) -> Result<SyncStats, Error> {
>>> let (owner, _lock_guard) =
>>> match params
>>> @@ -1073,25 +1195,47 @@ async fn lock_and_pull_group(
>>> {
>>> Ok(res) => res,
>>> Err(err) => {
>>> - info!("sync group {group} failed - group lock
>>> failed: {err}");
>>> - info!("create_locked_backup_group failed");
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!("sync group {group} failed - group
>>> lock failed: {err}"),
>>> + )
>>> + .await?;
>>> + log_sender
>>> + .log(Level::INFO, "create_locked_backup_group
>>> failed".to_string())
>>> + .await?;
>>> return Err(err);
>>> }
>>> };
>>> if params.owner != owner {
>>> // only the owner is allowed to create additional snapshots
>>> - info!(
>>> - "sync group {group} failed - owner check failed ({} !=
>>> {owner})",
>>> - params.owner
>>> - );
>>> + log_sender
>>> + .log(
>>> + Level::INFO,
>>> + format!(
>>> + "sync group {group} failed - owner check failed
>>> ({} != {owner})",
>>> + params.owner,
>>> + ),
>>> + )
>>> + .await?;
>>> return Err(format_err!("owner check failed"));
>>> }
>>> - match pull_group(params, namespace, group,
>>> shared_group_progress).await {
>>> + match pull_group(
>>> + params,
>>> + namespace,
>>> + group,
>>> + shared_group_progress,
>>> + Arc::clone(&log_sender),
>>> + )
>>> + .await
>>> + {
>>> Ok(stats) => Ok(stats),
>>> Err(err) => {
>>> - info!("sync group {group} failed - {err:#}");
>>> + log_sender
>>> + .log(Level::INFO, format!("sync group {group} failed
>>> - {err:#}"))
>>> + .await?;
>>> Err(err)
>>> }
>>> }
>>> @@ -1124,7 +1268,7 @@ async fn pull_ns(
>>> list.sort_unstable();
>>> info!(
>>> - "found {} groups to sync (out of {unfiltered_count} total)",
>>> + "Found {} groups to sync (out of {unfiltered_count} total)",
>>> list.len()
>>> );
>>> @@ -1143,6 +1287,10 @@ async fn pull_ns(
>>> let shared_group_progress =
>>> Arc::new(SharedGroupProgress::with_total_groups(list.len()));
>>> let mut group_workers =
>>> BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
>>> + let (buffered_logger, sender_builder) = BufferedLogger::new(5,
>>> Duration::from_secs(1));
>>> + // runs until sender_builder and all senders build from it are
>>> being dropped
>>> + buffered_logger.run_log_collection();
>>> +
>>> let mut process_results = |results| {
>>> for result in results {
>>> match result {
>>> @@ -1160,16 +1308,20 @@ async fn pull_ns(
>>> let target_ns = target_ns.clone();
>>> let params = Arc::clone(¶ms);
>>> let group_progress_cloned =
>>> Arc::clone(&shared_group_progress);
>>> + let log_sender =
>>> Arc::new(sender_builder.sender_with_label(group.to_string()));
>>> let results = group_workers
>>> .spawn_task(async move {
>>> - lock_and_pull_group(
>>> + let result = lock_and_pull_group(
>>> Arc::clone(¶ms),
>>> &group,
>>> &namespace,
>>> &target_ns,
>>> group_progress_cloned,
>>> + Arc::clone(&log_sender),
>>> )
>>> - .await
>>> + .await;
>>> + let _ = log_sender.flush().await;
>>> + result
>>> })
>>> .await
>>> .map_err(|err| format_err!("failed to join on worker
>>> task: {err:#}"))?;
>>> @@ -1197,7 +1349,7 @@ async fn pull_ns(
>>> if !local_group.apply_filters(¶ms.group_filter) {
>>> continue;
>>> }
>>> - info!("delete vanished group '{local_group}'");
>>> + info!("Delete vanished group '{local_group}'");
>>> let delete_stats_result = params
>>> .target
>>> .store
>>> @@ -1206,7 +1358,7 @@ async fn pull_ns(
>>> match delete_stats_result {
>>> Ok(stats) => {
>>> if !stats.all_removed() {
>>> - info!("kept some protected snapshots of
>>> group '{local_group}'");
>>> + info!("Kept some protected snapshots of
>>> group '{local_group}'");
>>>
>>> sync_stats.add(SyncStats::from(RemovedVanishedStats {
>>> snapshots: stats.removed_snapshots(),
>>> groups: 0,
>>> @@ -1229,7 +1381,7 @@ async fn pull_ns(
>>> Ok(())
>>> });
>>> if let Err(err) = result {
>>> - info!("error during cleanup: {err}");
>>> + info!("Error during cleanup: {err}");
>>> errors = true;
>>> };
>>> }
>>> diff --git a/src/server/sync.rs b/src/server/sync.rs
>>> index e88418442..17ed4839f 100644
>>> --- a/src/server/sync.rs
>>> +++ b/src/server/sync.rs
>>> @@ -13,7 +13,6 @@ use futures::{future::FutureExt, select};
>>> use hyper::http::StatusCode;
>>> use pbs_config::BackupLockGuard;
>>> use serde_json::json;
>>> -use tokio::task::JoinSet;
>>> use tracing::{info, warn};
>>> use proxmox_human_byte::HumanByte;
>>> @@ -136,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader {
>>> Some(HttpError { code, message }) => match *code {
>>> StatusCode::NOT_FOUND => {
>>> info!(
>>> - "skipping snapshot {} - vanished since
>>> start of sync",
>>> + "Snapshot {}: skipped because vanished
>>> since start of sync",
>>> &self.dir
>>> );
>>> return Ok(None);
>>> }
>>> _ => {
>>> - bail!("HTTP error {code} - {message}");
>>> + bail!("Snapshot {}: HTTP error {code} -
>>> {message}", &self.dir);
>>> }
>>> },
>>> None => {
>>> @@ -176,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader {
>>> bail!("Atomic rename file {to_path:?} failed -
>>> {err}");
>>> }
>>> info!(
>>> - "got backup log file {client_log_name}",
>>> + "Snapshot {snapshot}: got backup log file
>>> {client_log_name}",
>>> + snapshot = &self.dir,
>>> client_log_name = client_log_name.deref()
>>> );
>>> }
>>> --
>>> 2.47.3
>>>
>>>
>>>
>>>
>>>
>>>
>
>
>
>
>
next prev parent reply other threads:[~2026-04-21 7:43 UTC|newest]
Thread overview: 29+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-17 9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-20 10:57 ` Fabian Grünbichler
2026-04-20 17:15 ` Christian Ebner
2026-04-21 6:49 ` Fabian Grünbichler
2026-04-17 9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-20 11:15 ` Fabian Grünbichler
2026-04-17 9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-20 12:29 ` Fabian Grünbichler
2026-04-17 9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-20 11:56 ` Fabian Grünbichler
2026-04-21 7:21 ` Christian Ebner
2026-04-21 7:42 ` Christian Ebner [this message]
2026-04-21 8:00 ` Fabian Grünbichler
2026-04-21 8:04 ` Christian Ebner
2026-04-21 12:57 ` Thomas Lamprecht
2026-04-17 9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
2026-04-20 12:33 ` [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Fabian Grünbichler
2026-04-21 10:28 ` superseded: " Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=7f8852e8-c092-4e2d-a031-192c087fb364@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox