From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging
Date: Fri, 17 Apr 2026 11:26:20 +0200 [thread overview]
Message-ID: <20260417092621.455374-15-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>
Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix push job log messages by the corresponding group or
snapshot and use the buffered logger implementation to buffer up to 5
lines subsequent lines with a timeout of 1 second. This reduces
interwoven log messages stemming from different groups.
Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- uses BufferedLogger implementation, refactored accordingly
- improve log line prefixes
- add missing error contexts
src/server/push.rs | 245 ++++++++++++++++++++++++++++++++++++---------
1 file changed, 199 insertions(+), 46 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 9b7fb4522..520bdd250 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -2,12 +2,13 @@
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
+use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
use futures::stream::{self, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tracing::{info, warn};
+use tracing::{info, warn, Level};
use pbs_api_types::{
print_store_and_ns, ApiVersion, ApiVersionInfo, ArchiveType, Authid, BackupArchiveName,
@@ -28,6 +29,9 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
+
+use proxmox_human_byte::HumanByte;
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -564,6 +568,10 @@ pub(crate) async fn push_namespace(
let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+ let (buffered_logger, sender_builder) = BufferedLogger::new(5, Duration::from_secs(1));
+ // runs until sender_builder and all senders build from it are being dropped
+ buffered_logger.run_log_collection();
+
let mut process_results = |results| {
for result in results {
match result {
@@ -571,7 +579,7 @@ pub(crate) async fn push_namespace(
stats.add(sync_stats);
progress.done_groups = shared_group_progress.increment_done();
}
- Err(()) => errors = true,
+ Err(_err) => errors = true,
}
}
};
@@ -582,17 +590,21 @@ pub(crate) async fn push_namespace(
let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
let synced_groups = Arc::clone(&synced_groups);
let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let log_sender = Arc::new(sender_builder.sender_with_label(group.to_string()));
let results = group_workers
.spawn_task(async move {
- push_group_do(
+ let result = push_group_do(
params,
&namespace,
&group,
group_progress_cloned,
synced_groups,
not_owned_target_groups,
+ Arc::clone(&log_sender),
)
- .await
+ .await;
+ let _ = log_sender.flush().await;
+ result
})
.await
.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -694,23 +706,46 @@ async fn push_group_do(
shared_group_progress: Arc<SharedGroupProgress>,
synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
not_owned_target_groups: Arc<HashSet<BackupGroup>>,
-) -> Result<SyncStats, ()> {
+ log_sender: Arc<LogLineSender>,
+) -> Result<SyncStats, Error> {
if not_owned_target_groups.contains(group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ ),
+ )
+ .await?;
shared_group_progress.increment_done();
return Ok(SyncStats::default());
}
synced_groups.lock().unwrap().insert(group.clone());
- push_group(params, namespace, group, Arc::clone(&shared_group_progress))
- .await
- .map_err(|err| {
- warn!("Group {group}: Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- })
+ match push_group(
+ params,
+ namespace,
+ group,
+ Arc::clone(&shared_group_progress),
+ Arc::clone(&log_sender),
+ )
+ .await
+ {
+ Ok(res) => Ok(res),
+ Err(err) => {
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!("Failed to push group {group} to remote!"),
+ )
+ .await?;
+ Err(err)
+ }
+ }
}
/// Push group including all snaphshots to target
@@ -727,6 +762,7 @@ pub(crate) async fn push_group(
namespace: &BackupNamespace,
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -738,7 +774,12 @@ pub(crate) async fn push_group(
snapshots.sort_unstable_by_key(|a| a.backup.time);
if snapshots.is_empty() {
- info!("Group '{group}' contains no snapshots to sync to remote");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("Group '{group}' contains no snapshots to sync to remote"),
+ )
+ .await?;
}
let target_namespace = params.map_to_target(namespace)?;
@@ -786,11 +827,15 @@ pub(crate) async fn push_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_sender
+ .log(Level::INFO, already_synced_skip_info.to_string())
+ .await?;
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_sender
+ .log(Level::INFO, transfer_last_skip_info.to_string())
+ .await?;
transfer_last_skip_info.reset();
}
@@ -800,11 +845,18 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ let prefix = proxmox_time::epoch_to_rfc3339_utc(source_snapshot.time)
+ .context("invalid timestamp")?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: start sync"))
+ .await?;
let result = push_snapshot(
¶ms,
namespace,
&source_snapshot,
fetch_previous_manifest,
+ Arc::clone(&log_sender),
+ &prefix,
)
.await;
fetch_previous_manifest = true;
@@ -812,10 +864,18 @@ pub(crate) async fn push_group(
// Update done groups progress by other parallel running pushes
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: sync done"))
+ .await?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!("Percentage done: group {group}: {local_progress}"),
+ )
+ .await?;
stats.add(sync_stats);
}
@@ -825,25 +885,42 @@ pub(crate) async fn push_group(
continue;
}
if snapshot.protected {
- info!(
- "Kept protected snapshot {name} on remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Kept protected snapshot {name} on remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
continue;
}
match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await {
Ok(()) => {
- info!(
- "Removed vanished snapshot {name} from remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Removed vanished snapshot {name} from remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!(
- "Failed to remove vanished snapshot {name} from remote!",
- name = snapshot.backup
- );
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Failed to remove vanished snapshot {name} from remote!",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
}
stats.add(SyncStats::from(RemovedVanishedStats {
@@ -868,24 +945,40 @@ pub(crate) async fn push_snapshot(
namespace: &BackupNamespace,
snapshot: &BackupDir,
fetch_previous_manifest: bool,
+ log_sender: Arc<LogLineSender>,
+ prefix: &String,
) -> Result<SyncStats, Error> {
let mut stats = SyncStats::default();
- let target_ns = params.map_to_target(namespace)?;
+ let target_ns = params
+ .map_to_target(namespace)
+ .with_context(|| prefix.clone())?;
let backup_dir = params
.source
.store
- .backup_dir(namespace.clone(), snapshot.clone())?;
+ .backup_dir(namespace.clone(), snapshot.clone())
+ .with_context(|| prefix.clone())?;
// Reader locks the snapshot
- let reader = params.source.reader(namespace, snapshot).await?;
+ let reader = params
+ .source
+ .reader(namespace, snapshot)
+ .await
+ .with_context(|| prefix.clone())?;
// Does not lock the manifest, but the reader already assures a locked snapshot
let source_manifest = match backup_dir.load_manifest() {
Ok((manifest, _raw_size)) => manifest,
Err(err) => {
// No manifest in snapshot or failed to read, warn and skip
- log::warn!("Encountered errors: {err:#}");
- log::warn!("Failed to load manifest for '{snapshot}'!");
+ log_sender
+ .log(
+ Level::WARN,
+ format!("{prefix}: Encountered errors: {err:#}"),
+ )
+ .await?;
+ log_sender
+ .log(Level::WARN, format!("{prefix}: Failed to load manifest!"))
+ .await?;
return Ok(stats);
}
};
@@ -912,14 +1005,22 @@ pub(crate) async fn push_snapshot(
no_cache: false,
},
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
let mut previous_manifest = None;
// Use manifest of previous snapshots in group on target for chunk upload deduplication
if fetch_previous_manifest {
match backup_writer.download_previous_manifest().await {
Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
- Err(err) => log::info!("Could not download previous manifest - {err}"),
+ Err(err) => {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: Could not download previous manifest - {err}"),
+ )
+ .await?
+ }
}
};
@@ -948,12 +1049,32 @@ pub(crate) async fn push_snapshot(
path.push(&entry.filename);
if path.try_exists()? {
let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: sync archive {archive_name}"),
+ )
+ .await?;
+ let archive_prefix = format!("{prefix}: {archive_name}");
match archive_name.archive_type() {
ArchiveType::Blob => {
let file = std::fs::File::open(&path)?;
let backup_stats = backup_writer
.upload_blob(file, archive_name.as_ref())
.await?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(backup_stats.size),
+ HumanByte::new_binary(
+ backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+ ),
+ ),
+ )
+ .await
+ .with_context(|| archive_prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
bytes: backup_stats.size as usize,
@@ -972,7 +1093,7 @@ pub(crate) async fn push_snapshot(
)
.await;
}
- let index = DynamicIndexReader::open(&path)?;
+ let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
.context("failed to get chunk reader")?;
@@ -984,7 +1105,20 @@ pub(crate) async fn push_snapshot(
IndexType::Dynamic,
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
ArchiveType::FixedIndex => {
@@ -1001,7 +1135,8 @@ pub(crate) async fn push_snapshot(
let index = FixedIndexReader::open(&path)?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
- .context("failed to get chunk reader")?;
+ .context("failed to get chunk reader")
+ .with_context(|| archive_prefix.clone())?;
let size = index.index_bytes();
let sync_stats = push_index(
&archive_name,
@@ -1011,7 +1146,20 @@ pub(crate) async fn push_snapshot(
IndexType::Fixed(Some(size)),
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
}
@@ -1032,7 +1180,8 @@ pub(crate) async fn push_snapshot(
client_log_name.as_ref(),
upload_options.clone(),
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
}
// Rewrite manifest for pushed snapshot, recreating manifest from source on target
@@ -1044,8 +1193,12 @@ pub(crate) async fn push_snapshot(
MANIFEST_BLOB_NAME.as_ref(),
upload_options,
)
- .await?;
- backup_writer.finish().await?;
+ .await
+ .with_context(|| prefix.clone())?;
+ backup_writer
+ .finish()
+ .await
+ .with_context(|| prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
--
2.47.3
next prev parent reply other threads:[~2026-04-17 9:27 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-17 9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17 9:26 ` Christian Ebner [this message]
2026-04-17 9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260417092621.455374-15-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox