From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging
Date: Fri, 17 Apr 2026 11:26:20 +0200 [thread overview]
Message-ID: <20260417092621.455374-15-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>
Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix push job log messages by the corresponding group or
snapshot and use the buffered logger implementation to buffer up to 5
lines subsequent lines with a timeout of 1 second. This reduces
interwoven log messages stemming from different groups.
Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- uses BufferedLogger implementation, refactored accordingly
- improve log line prefixes
- add missing error contexts
src/server/push.rs | 245 ++++++++++++++++++++++++++++++++++++---------
1 file changed, 199 insertions(+), 46 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 9b7fb4522..520bdd250 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -2,12 +2,13 @@
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
+use std::time::Duration;
use anyhow::{bail, format_err, Context, Error};
use futures::stream::{self, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
-use tracing::{info, warn};
+use tracing::{info, warn, Level};
use pbs_api_types::{
print_store_and_ns, ApiVersion, ApiVersionInfo, ArchiveType, Authid, BackupArchiveName,
@@ -28,6 +29,9 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
use pbs_tools::bounded_join_set::BoundedJoinSet;
+use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender};
+
+use proxmox_human_byte::HumanByte;
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -564,6 +568,10 @@ pub(crate) async fn push_namespace(
let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+ let (buffered_logger, sender_builder) = BufferedLogger::new(5, Duration::from_secs(1));
+ // runs until sender_builder and all senders build from it are being dropped
+ buffered_logger.run_log_collection();
+
let mut process_results = |results| {
for result in results {
match result {
@@ -571,7 +579,7 @@ pub(crate) async fn push_namespace(
stats.add(sync_stats);
progress.done_groups = shared_group_progress.increment_done();
}
- Err(()) => errors = true,
+ Err(_err) => errors = true,
}
}
};
@@ -582,17 +590,21 @@ pub(crate) async fn push_namespace(
let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
let synced_groups = Arc::clone(&synced_groups);
let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let log_sender = Arc::new(sender_builder.sender_with_label(group.to_string()));
let results = group_workers
.spawn_task(async move {
- push_group_do(
+ let result = push_group_do(
params,
&namespace,
&group,
group_progress_cloned,
synced_groups,
not_owned_target_groups,
+ Arc::clone(&log_sender),
)
- .await
+ .await;
+ let _ = log_sender.flush().await;
+ result
})
.await
.map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
@@ -694,23 +706,46 @@ async fn push_group_do(
shared_group_progress: Arc<SharedGroupProgress>,
synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
not_owned_target_groups: Arc<HashSet<BackupGroup>>,
-) -> Result<SyncStats, ()> {
+ log_sender: Arc<LogLineSender>,
+) -> Result<SyncStats, Error> {
if not_owned_target_groups.contains(group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ ),
+ )
+ .await?;
shared_group_progress.increment_done();
return Ok(SyncStats::default());
}
synced_groups.lock().unwrap().insert(group.clone());
- push_group(params, namespace, group, Arc::clone(&shared_group_progress))
- .await
- .map_err(|err| {
- warn!("Group {group}: Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- })
+ match push_group(
+ params,
+ namespace,
+ group,
+ Arc::clone(&shared_group_progress),
+ Arc::clone(&log_sender),
+ )
+ .await
+ {
+ Ok(res) => Ok(res),
+ Err(err) => {
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!("Failed to push group {group} to remote!"),
+ )
+ .await?;
+ Err(err)
+ }
+ }
}
/// Push group including all snaphshots to target
@@ -727,6 +762,7 @@ pub(crate) async fn push_group(
namespace: &BackupNamespace,
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
+ log_sender: Arc<LogLineSender>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -738,7 +774,12 @@ pub(crate) async fn push_group(
snapshots.sort_unstable_by_key(|a| a.backup.time);
if snapshots.is_empty() {
- info!("Group '{group}' contains no snapshots to sync to remote");
+ log_sender
+ .log(
+ Level::INFO,
+ format!("Group '{group}' contains no snapshots to sync to remote"),
+ )
+ .await?;
}
let target_namespace = params.map_to_target(namespace)?;
@@ -786,11 +827,15 @@ pub(crate) async fn push_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_sender
+ .log(Level::INFO, already_synced_skip_info.to_string())
+ .await?;
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_sender
+ .log(Level::INFO, transfer_last_skip_info.to_string())
+ .await?;
transfer_last_skip_info.reset();
}
@@ -800,11 +845,18 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ let prefix = proxmox_time::epoch_to_rfc3339_utc(source_snapshot.time)
+ .context("invalid timestamp")?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: start sync"))
+ .await?;
let result = push_snapshot(
¶ms,
namespace,
&source_snapshot,
fetch_previous_manifest,
+ Arc::clone(&log_sender),
+ &prefix,
)
.await;
fetch_previous_manifest = true;
@@ -812,10 +864,18 @@ pub(crate) async fn push_group(
// Update done groups progress by other parallel running pushes
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
+ log_sender
+ .log(Level::INFO, format!("{prefix}: sync done"))
+ .await?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!("Percentage done: group {group}: {local_progress}"),
+ )
+ .await?;
stats.add(sync_stats);
}
@@ -825,25 +885,42 @@ pub(crate) async fn push_group(
continue;
}
if snapshot.protected {
- info!(
- "Kept protected snapshot {name} on remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Kept protected snapshot {name} on remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
continue;
}
match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await {
Ok(()) => {
- info!(
- "Removed vanished snapshot {name} from remote",
- name = snapshot.backup
- );
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "Removed vanished snapshot {name} from remote",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!(
- "Failed to remove vanished snapshot {name} from remote!",
- name = snapshot.backup
- );
+ log_sender
+ .log(Level::WARN, format!("Encountered errors: {err:#}"))
+ .await?;
+ log_sender
+ .log(
+ Level::WARN,
+ format!(
+ "Failed to remove vanished snapshot {name} from remote!",
+ name = snapshot.backup
+ ),
+ )
+ .await?;
}
}
stats.add(SyncStats::from(RemovedVanishedStats {
@@ -868,24 +945,40 @@ pub(crate) async fn push_snapshot(
namespace: &BackupNamespace,
snapshot: &BackupDir,
fetch_previous_manifest: bool,
+ log_sender: Arc<LogLineSender>,
+ prefix: &String,
) -> Result<SyncStats, Error> {
let mut stats = SyncStats::default();
- let target_ns = params.map_to_target(namespace)?;
+ let target_ns = params
+ .map_to_target(namespace)
+ .with_context(|| prefix.clone())?;
let backup_dir = params
.source
.store
- .backup_dir(namespace.clone(), snapshot.clone())?;
+ .backup_dir(namespace.clone(), snapshot.clone())
+ .with_context(|| prefix.clone())?;
// Reader locks the snapshot
- let reader = params.source.reader(namespace, snapshot).await?;
+ let reader = params
+ .source
+ .reader(namespace, snapshot)
+ .await
+ .with_context(|| prefix.clone())?;
// Does not lock the manifest, but the reader already assures a locked snapshot
let source_manifest = match backup_dir.load_manifest() {
Ok((manifest, _raw_size)) => manifest,
Err(err) => {
// No manifest in snapshot or failed to read, warn and skip
- log::warn!("Encountered errors: {err:#}");
- log::warn!("Failed to load manifest for '{snapshot}'!");
+ log_sender
+ .log(
+ Level::WARN,
+ format!("{prefix}: Encountered errors: {err:#}"),
+ )
+ .await?;
+ log_sender
+ .log(Level::WARN, format!("{prefix}: Failed to load manifest!"))
+ .await?;
return Ok(stats);
}
};
@@ -912,14 +1005,22 @@ pub(crate) async fn push_snapshot(
no_cache: false,
},
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
let mut previous_manifest = None;
// Use manifest of previous snapshots in group on target for chunk upload deduplication
if fetch_previous_manifest {
match backup_writer.download_previous_manifest().await {
Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
- Err(err) => log::info!("Could not download previous manifest - {err}"),
+ Err(err) => {
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: Could not download previous manifest - {err}"),
+ )
+ .await?
+ }
}
};
@@ -948,12 +1049,32 @@ pub(crate) async fn push_snapshot(
path.push(&entry.filename);
if path.try_exists()? {
let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!("{prefix}: sync archive {archive_name}"),
+ )
+ .await?;
+ let archive_prefix = format!("{prefix}: {archive_name}");
match archive_name.archive_type() {
ArchiveType::Blob => {
let file = std::fs::File::open(&path)?;
let backup_stats = backup_writer
.upload_blob(file, archive_name.as_ref())
.await?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(backup_stats.size),
+ HumanByte::new_binary(
+ backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+ ),
+ ),
+ )
+ .await
+ .with_context(|| archive_prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
bytes: backup_stats.size as usize,
@@ -972,7 +1093,7 @@ pub(crate) async fn push_snapshot(
)
.await;
}
- let index = DynamicIndexReader::open(&path)?;
+ let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
.context("failed to get chunk reader")?;
@@ -984,7 +1105,20 @@ pub(crate) async fn push_snapshot(
IndexType::Dynamic,
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
ArchiveType::FixedIndex => {
@@ -1001,7 +1135,8 @@ pub(crate) async fn push_snapshot(
let index = FixedIndexReader::open(&path)?;
let chunk_reader = reader
.chunk_reader(entry.chunk_crypt_mode())
- .context("failed to get chunk reader")?;
+ .context("failed to get chunk reader")
+ .with_context(|| archive_prefix.clone())?;
let size = index.index_bytes();
let sync_stats = push_index(
&archive_name,
@@ -1011,7 +1146,20 @@ pub(crate) async fn push_snapshot(
IndexType::Fixed(Some(size)),
known_chunks.clone(),
)
- .await?;
+ .await
+ .with_context(|| archive_prefix.clone())?;
+ log_sender
+ .log(
+ Level::INFO,
+ format!(
+ "{archive_prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ ),
+ )
+ .await?;
stats.add(sync_stats);
}
}
@@ -1032,7 +1180,8 @@ pub(crate) async fn push_snapshot(
client_log_name.as_ref(),
upload_options.clone(),
)
- .await?;
+ .await
+ .with_context(|| prefix.clone())?;
}
// Rewrite manifest for pushed snapshot, recreating manifest from source on target
@@ -1044,8 +1193,12 @@ pub(crate) async fn push_snapshot(
MANIFEST_BLOB_NAME.as_ref(),
upload_options,
)
- .await?;
- backup_writer.finish().await?;
+ .await
+ .with_context(|| prefix.clone())?;
+ backup_writer
+ .finish()
+ .await
+ .with_context(|| prefix.clone())?;
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
--
2.47.3
next prev parent reply other threads:[~2026-04-17 9:27 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-04-17 9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17 9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17 9:26 ` Christian Ebner [this message]
2026-04-17 9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260417092621.455374-15-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.