From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 4C5381FF13E for ; Fri, 17 Apr 2026 11:27:12 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 233C21BD0E; Fri, 17 Apr 2026 11:27:12 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Date: Fri, 17 Apr 2026 11:26:20 +0200 Message-ID: <20260417092621.455374-15-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com> References: <20260417092621.455374-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776417915651 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.070 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: HJNDYBPM22NI5BYX5BH3GQ2HGO2AKDBA X-Message-ID-Hash: HJNDYBPM22NI5BYX5BH3GQ2HGO2AKDBA X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Pushing groups and therefore also snapshots in parallel leads to unordered log outputs, making it mostly impossible to relate a log message to a backup snapshot/group. Therefore, prefix push job log messages by the corresponding group or snapshot and use the buffered logger implementation to buffer up to 5 lines subsequent lines with a timeout of 1 second. This reduces interwoven log messages stemming from different groups. Also, be more verbose for push syncs, adding additional log output for the groups, snapshots and archives being pushed. Signed-off-by: Christian Ebner --- changes since version 5: - uses BufferedLogger implementation, refactored accordingly - improve log line prefixes - add missing error contexts src/server/push.rs | 245 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 199 insertions(+), 46 deletions(-) diff --git a/src/server/push.rs b/src/server/push.rs index 9b7fb4522..520bdd250 100644 --- a/src/server/push.rs +++ b/src/server/push.rs @@ -2,12 +2,13 @@ use std::collections::HashSet; use std::sync::{Arc, Mutex}; +use std::time::Duration; use anyhow::{bail, format_err, Context, Error}; use futures::stream::{self, StreamExt, TryStreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tracing::{info, warn}; +use tracing::{info, warn, Level}; use pbs_api_types::{ print_store_and_ns, ApiVersion, ApiVersionInfo, ArchiveType, Authid, BackupArchiveName, @@ -28,6 +29,9 @@ use pbs_datastore::index::IndexFile; use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{DataStore, StoreProgress}; use pbs_tools::bounded_join_set::BoundedJoinSet; +use pbs_tools::buffered_logger::{BufferedLogger, LogLineSender}; + +use proxmox_human_byte::HumanByte; use super::sync::{ check_namespace_depth_limit, exclude_not_verified_or_encrypted, @@ -564,6 +568,10 @@ pub(crate) async fn push_namespace( let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1)); let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len())); + let (buffered_logger, sender_builder) = BufferedLogger::new(5, Duration::from_secs(1)); + // runs until sender_builder and all senders build from it are being dropped + buffered_logger.run_log_collection(); + let mut process_results = |results| { for result in results { match result { @@ -571,7 +579,7 @@ pub(crate) async fn push_namespace( stats.add(sync_stats); progress.done_groups = shared_group_progress.increment_done(); } - Err(()) => errors = true, + Err(_err) => errors = true, } } }; @@ -582,17 +590,21 @@ pub(crate) async fn push_namespace( let not_owned_target_groups = Arc::clone(¬_owned_target_groups); let synced_groups = Arc::clone(&synced_groups); let group_progress_cloned = Arc::clone(&shared_group_progress); + let log_sender = Arc::new(sender_builder.sender_with_label(group.to_string())); let results = group_workers .spawn_task(async move { - push_group_do( + let result = push_group_do( params, &namespace, &group, group_progress_cloned, synced_groups, not_owned_target_groups, + Arc::clone(&log_sender), ) - .await + .await; + let _ = log_sender.flush().await; + result }) .await .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?; @@ -694,23 +706,46 @@ async fn push_group_do( shared_group_progress: Arc, synced_groups: Arc>>, not_owned_target_groups: Arc>, -) -> Result { + log_sender: Arc, +) -> Result { if not_owned_target_groups.contains(group) { - warn!( - "Group '{group}' not owned by remote user '{}' on target, skipping upload", - params.target.remote_user(), - ); + log_sender + .log( + Level::WARN, + format!( + "Group '{group}' not owned by remote user '{}' on target, skipping upload", + params.target.remote_user(), + ), + ) + .await?; shared_group_progress.increment_done(); return Ok(SyncStats::default()); } synced_groups.lock().unwrap().insert(group.clone()); - push_group(params, namespace, group, Arc::clone(&shared_group_progress)) - .await - .map_err(|err| { - warn!("Group {group}: Encountered errors: {err:#}"); - warn!("Failed to push group {group} to remote!"); - }) + match push_group( + params, + namespace, + group, + Arc::clone(&shared_group_progress), + Arc::clone(&log_sender), + ) + .await + { + Ok(res) => Ok(res), + Err(err) => { + log_sender + .log(Level::WARN, format!("Encountered errors: {err:#}")) + .await?; + log_sender + .log( + Level::WARN, + format!("Failed to push group {group} to remote!"), + ) + .await?; + Err(err) + } + } } /// Push group including all snaphshots to target @@ -727,6 +762,7 @@ pub(crate) async fn push_group( namespace: &BackupNamespace, group: &BackupGroup, shared_group_progress: Arc, + log_sender: Arc, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -738,7 +774,12 @@ pub(crate) async fn push_group( snapshots.sort_unstable_by_key(|a| a.backup.time); if snapshots.is_empty() { - info!("Group '{group}' contains no snapshots to sync to remote"); + log_sender + .log( + Level::INFO, + format!("Group '{group}' contains no snapshots to sync to remote"), + ) + .await?; } let target_namespace = params.map_to_target(namespace)?; @@ -786,11 +827,15 @@ pub(crate) async fn push_group( .collect(); if already_synced_skip_info.count > 0 { - info!("{already_synced_skip_info}"); + log_sender + .log(Level::INFO, already_synced_skip_info.to_string()) + .await?; already_synced_skip_info.reset(); } if transfer_last_skip_info.count > 0 { - info!("{transfer_last_skip_info}"); + log_sender + .log(Level::INFO, transfer_last_skip_info.to_string()) + .await?; transfer_last_skip_info.reset(); } @@ -800,11 +845,18 @@ pub(crate) async fn push_group( let mut stats = SyncStats::default(); let mut fetch_previous_manifest = !target_snapshots.is_empty(); for (pos, source_snapshot) in snapshots.into_iter().enumerate() { + let prefix = proxmox_time::epoch_to_rfc3339_utc(source_snapshot.time) + .context("invalid timestamp")?; + log_sender + .log(Level::INFO, format!("{prefix}: start sync")) + .await?; let result = push_snapshot( ¶ms, namespace, &source_snapshot, fetch_previous_manifest, + Arc::clone(&log_sender), + &prefix, ) .await; fetch_previous_manifest = true; @@ -812,10 +864,18 @@ pub(crate) async fn push_group( // Update done groups progress by other parallel running pushes local_progress.done_groups = shared_group_progress.load_done(); local_progress.done_snapshots = pos as u64 + 1; - info!("Percentage done: group {group}: {local_progress}"); // stop on error let sync_stats = result?; + log_sender + .log(Level::INFO, format!("{prefix}: sync done")) + .await?; + log_sender + .log( + Level::INFO, + format!("Percentage done: group {group}: {local_progress}"), + ) + .await?; stats.add(sync_stats); } @@ -825,25 +885,42 @@ pub(crate) async fn push_group( continue; } if snapshot.protected { - info!( - "Kept protected snapshot {name} on remote", - name = snapshot.backup - ); + log_sender + .log( + Level::INFO, + format!( + "Kept protected snapshot {name} on remote", + name = snapshot.backup + ), + ) + .await?; continue; } match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await { Ok(()) => { - info!( - "Removed vanished snapshot {name} from remote", - name = snapshot.backup - ); + log_sender + .log( + Level::INFO, + format!( + "Removed vanished snapshot {name} from remote", + name = snapshot.backup + ), + ) + .await?; } Err(err) => { - warn!("Encountered errors: {err:#}"); - warn!( - "Failed to remove vanished snapshot {name} from remote!", - name = snapshot.backup - ); + log_sender + .log(Level::WARN, format!("Encountered errors: {err:#}")) + .await?; + log_sender + .log( + Level::WARN, + format!( + "Failed to remove vanished snapshot {name} from remote!", + name = snapshot.backup + ), + ) + .await?; } } stats.add(SyncStats::from(RemovedVanishedStats { @@ -868,24 +945,40 @@ pub(crate) async fn push_snapshot( namespace: &BackupNamespace, snapshot: &BackupDir, fetch_previous_manifest: bool, + log_sender: Arc, + prefix: &String, ) -> Result { let mut stats = SyncStats::default(); - let target_ns = params.map_to_target(namespace)?; + let target_ns = params + .map_to_target(namespace) + .with_context(|| prefix.clone())?; let backup_dir = params .source .store - .backup_dir(namespace.clone(), snapshot.clone())?; + .backup_dir(namespace.clone(), snapshot.clone()) + .with_context(|| prefix.clone())?; // Reader locks the snapshot - let reader = params.source.reader(namespace, snapshot).await?; + let reader = params + .source + .reader(namespace, snapshot) + .await + .with_context(|| prefix.clone())?; // Does not lock the manifest, but the reader already assures a locked snapshot let source_manifest = match backup_dir.load_manifest() { Ok((manifest, _raw_size)) => manifest, Err(err) => { // No manifest in snapshot or failed to read, warn and skip - log::warn!("Encountered errors: {err:#}"); - log::warn!("Failed to load manifest for '{snapshot}'!"); + log_sender + .log( + Level::WARN, + format!("{prefix}: Encountered errors: {err:#}"), + ) + .await?; + log_sender + .log(Level::WARN, format!("{prefix}: Failed to load manifest!")) + .await?; return Ok(stats); } }; @@ -912,14 +1005,22 @@ pub(crate) async fn push_snapshot( no_cache: false, }, ) - .await?; + .await + .with_context(|| prefix.clone())?; let mut previous_manifest = None; // Use manifest of previous snapshots in group on target for chunk upload deduplication if fetch_previous_manifest { match backup_writer.download_previous_manifest().await { Ok(manifest) => previous_manifest = Some(Arc::new(manifest)), - Err(err) => log::info!("Could not download previous manifest - {err}"), + Err(err) => { + log_sender + .log( + Level::INFO, + format!("{prefix}: Could not download previous manifest - {err}"), + ) + .await? + } } }; @@ -948,12 +1049,32 @@ pub(crate) async fn push_snapshot( path.push(&entry.filename); if path.try_exists()? { let archive_name = BackupArchiveName::from_path(&entry.filename)?; + log_sender + .log( + Level::INFO, + format!("{prefix}: sync archive {archive_name}"), + ) + .await?; + let archive_prefix = format!("{prefix}: {archive_name}"); match archive_name.archive_type() { ArchiveType::Blob => { let file = std::fs::File::open(&path)?; let backup_stats = backup_writer .upload_blob(file, archive_name.as_ref()) .await?; + log_sender + .log( + Level::INFO, + format!( + "{archive_prefix}: uploaded {} ({}/s)", + HumanByte::from(backup_stats.size), + HumanByte::new_binary( + backup_stats.size as f64 / backup_stats.duration.as_secs_f64() + ), + ), + ) + .await + .with_context(|| archive_prefix.clone())?; stats.add(SyncStats { chunk_count: backup_stats.chunk_count as usize, bytes: backup_stats.size as usize, @@ -972,7 +1093,7 @@ pub(crate) async fn push_snapshot( ) .await; } - let index = DynamicIndexReader::open(&path)?; + let index = DynamicIndexReader::open(&path).with_context(|| prefix.clone())?; let chunk_reader = reader .chunk_reader(entry.chunk_crypt_mode()) .context("failed to get chunk reader")?; @@ -984,7 +1105,20 @@ pub(crate) async fn push_snapshot( IndexType::Dynamic, known_chunks.clone(), ) - .await?; + .await + .with_context(|| archive_prefix.clone())?; + log_sender + .log( + Level::INFO, + format!( + "{archive_prefix}: uploaded {} ({}/s)", + HumanByte::from(sync_stats.bytes), + HumanByte::new_binary( + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64() + ), + ), + ) + .await?; stats.add(sync_stats); } ArchiveType::FixedIndex => { @@ -1001,7 +1135,8 @@ pub(crate) async fn push_snapshot( let index = FixedIndexReader::open(&path)?; let chunk_reader = reader .chunk_reader(entry.chunk_crypt_mode()) - .context("failed to get chunk reader")?; + .context("failed to get chunk reader") + .with_context(|| archive_prefix.clone())?; let size = index.index_bytes(); let sync_stats = push_index( &archive_name, @@ -1011,7 +1146,20 @@ pub(crate) async fn push_snapshot( IndexType::Fixed(Some(size)), known_chunks.clone(), ) - .await?; + .await + .with_context(|| archive_prefix.clone())?; + log_sender + .log( + Level::INFO, + format!( + "{archive_prefix}: uploaded {} ({}/s)", + HumanByte::from(sync_stats.bytes), + HumanByte::new_binary( + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64() + ), + ), + ) + .await?; stats.add(sync_stats); } } @@ -1032,7 +1180,8 @@ pub(crate) async fn push_snapshot( client_log_name.as_ref(), upload_options.clone(), ) - .await?; + .await + .with_context(|| prefix.clone())?; } // Rewrite manifest for pushed snapshot, recreating manifest from source on target @@ -1044,8 +1193,12 @@ pub(crate) async fn push_snapshot( MANIFEST_BLOB_NAME.as_ref(), upload_options, ) - .await?; - backup_writer.finish().await?; + .await + .with_context(|| prefix.clone())?; + backup_writer + .finish() + .await + .with_context(|| prefix.clone())?; stats.add(SyncStats { chunk_count: backup_stats.chunk_count as usize, -- 2.47.3