From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id DC47D1FF15D for ; Thu, 25 Jul 2024 12:19:53 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C177C3462; Thu, 25 Jul 2024 12:19:51 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 25 Jul 2024 12:19:22 +0200 Message-Id: <20240725101922.231053-5-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com> References: <20240725101922.231053-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" In order to keep the log messages in a meaningful order when running using parallel connections to sync backup groups, buffer them in the sync stats and only display them when the corresponding task is finished. Signed-off-by: Christian Ebner --- src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 134 insertions(+), 31 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 0a54217d4..109cd3d1c 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -89,6 +89,7 @@ pub(crate) struct PullStats { pub(crate) bytes: usize, pub(crate) elapsed: Duration, pub(crate) removed: Option, + pub(crate) log_buffer: Vec, } impl From for PullStats { @@ -101,10 +102,11 @@ impl From for PullStats { } impl PullStats { - fn add(&mut self, rhs: PullStats) { + fn add(&mut self, mut rhs: PullStats) { self.chunk_count += rhs.chunk_count; self.bytes += rhs.bytes; self.elapsed += rhs.elapsed; + self.log_buffer.append(&mut rhs.log_buffer); if let Some(rhs_removed) = rhs.removed { if let Some(ref mut removed) = self.removed { @@ -443,7 +445,6 @@ impl PullReader for RemoteReader { if let Err(err) = std::fs::rename(&tmp_path, to_path) { bail!("Atomic rename file {:?} failed - {}", to_path, err); } - info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"); } Ok(()) @@ -577,6 +578,7 @@ async fn pull_index_chunks( target: Arc, index: I, downloaded_chunks: Arc>>, + buffer_logs: bool, ) -> Result { use futures::stream::{self, StreamExt, TryStreamExt}; @@ -658,17 +660,20 @@ async fn pull_index_chunks( let bytes = bytes.load(Ordering::SeqCst); let chunk_count = chunk_count.load(Ordering::SeqCst); - info!( + let mut log_buffer = Vec::new(); + let msg = format!( "downloaded {} ({}/s)", HumanByte::from(bytes), HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), ); + log_info_buffer(msg, buffer_logs, &mut log_buffer); Ok(PullStats { chunk_count, bytes, elapsed, removed: None, + log_buffer, }) } @@ -702,6 +707,7 @@ async fn pull_single_archive<'a>( snapshot: &'a pbs_datastore::BackupDir, archive_info: &'a FileInfo, downloaded_chunks: Arc>>, + buffer_logs: bool, ) -> Result { let archive_name = &archive_info.filename; let mut path = snapshot.full_path(); @@ -712,7 +718,11 @@ async fn pull_single_archive<'a>( let mut pull_stats = PullStats::default(); - info!("sync archive {archive_name}"); + log_info_buffer( + format!("sync archive {archive_name}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); reader.load_file_into(archive_name, &tmp_path).await?; @@ -727,13 +737,18 @@ async fn pull_single_archive<'a>( verify_archive(archive_info, &csum, size)?; if reader.skip_chunk_sync(snapshot.datastore().name()) { - info!("skipping chunk sync for same datastore"); + log_info_buffer( + "skipping chunk sync for same datastore".to_string(), + buffer_logs, + &mut pull_stats.log_buffer, + ); } else { let stats = pull_index_chunks( reader.chunk_reader(archive_info.crypt_mode), snapshot.datastore().clone(), index, downloaded_chunks, + buffer_logs, ) .await?; pull_stats.add(stats); @@ -747,13 +762,18 @@ async fn pull_single_archive<'a>( verify_archive(archive_info, &csum, size)?; if reader.skip_chunk_sync(snapshot.datastore().name()) { - info!("skipping chunk sync for same datastore"); + log_info_buffer( + "skipping chunk sync for same datastore".to_string(), + buffer_logs, + &mut pull_stats.log_buffer, + ); } else { let stats = pull_index_chunks( reader.chunk_reader(archive_info.crypt_mode), snapshot.datastore().clone(), index, downloaded_chunks, + buffer_logs, ) .await?; pull_stats.add(stats); @@ -784,6 +804,7 @@ async fn pull_snapshot<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, + buffer_logs: bool, ) -> Result { let mut pull_stats = PullStats::default(); let mut manifest_name = snapshot.full_path(); @@ -820,8 +841,17 @@ async fn pull_snapshot<'a>( if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { if !client_log_name.exists() { reader.try_download_client_log(&client_log_name).await?; + log_info_buffer( + format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); }; - info!("no data changes"); + log_info_buffer( + "no data changes".to_string(), + buffer_logs, + &mut pull_stats.log_buffer, + ); let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(pull_stats); // nothing changed } @@ -841,7 +871,11 @@ async fn pull_snapshot<'a>( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - info!("detected changed file {path:?} - {err}"); + log_info_buffer( + format!("detected changed file {path:?} - {err}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); } } } @@ -851,7 +885,11 @@ async fn pull_snapshot<'a>( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - info!("detected changed file {path:?} - {err}"); + log_info_buffer( + format!("detected changed file {path:?} - {err}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); } } } @@ -861,15 +899,25 @@ async fn pull_snapshot<'a>( match manifest.verify_file(&item.filename, &csum, size) { Ok(_) => continue, Err(err) => { - info!("detected changed file {path:?} - {err}"); + log_info_buffer( + format!("detected changed file {path:?} - {err}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); } } } } } - let stats = - pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?; + let stats = pull_single_archive( + reader.clone(), + snapshot, + item, + downloaded_chunks.clone(), + buffer_logs, + ) + .await?; pull_stats.add(stats); } @@ -879,6 +927,11 @@ async fn pull_snapshot<'a>( if !client_log_name.exists() { reader.try_download_client_log(&client_log_name).await?; + log_info_buffer( + format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); }; snapshot .cleanup_unreferenced_files(&manifest) @@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, + buffer_logs: bool, ) -> Result { let (_path, is_new, _snap_lock) = snapshot .datastore() .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; - let pull_stats = if is_new { - info!("sync snapshot {}", snapshot.dir()); + let mut pull_stats = PullStats::default(); + if is_new { + log_info_buffer( + format!("sync snapshot {}", snapshot.dir()), + buffer_logs, + &mut pull_stats.log_buffer, + ); - match pull_snapshot(reader, snapshot, downloaded_chunks).await { + match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await { Err(err) => { if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( snapshot.backup_ns(), @@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>( } return Err(err); } - Ok(pull_stats) => { - info!("sync snapshot {} done", snapshot.dir()); - pull_stats + Ok(stats) => { + pull_stats.add(stats); + log_info_buffer( + format!("sync snapshot {}", snapshot.dir()), + buffer_logs, + &mut pull_stats.log_buffer, + ); } } } else { - info!("re-sync snapshot {}", snapshot.dir()); - pull_snapshot(reader, snapshot, downloaded_chunks).await? + log_info_buffer( + format!("re-sync snapshot {}", snapshot.dir()), + buffer_logs, + &mut pull_stats.log_buffer, + ); + let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?; + pull_stats.add(stats); }; Ok(pull_stats) @@ -1054,6 +1122,8 @@ async fn pull_group( .last_successful_backup(&target_ns, group)? .unwrap_or(i64::MIN); + let mut pull_stats = PullStats::default(); + let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1); let list: Vec = raw_list .into_iter() .enumerate() @@ -1063,7 +1133,11 @@ async fn pull_group( already_synced_skip_info.update(dir.time); return false; } else if already_synced_skip_info.count > 0 { - info!("{already_synced_skip_info}"); + log_info_buffer( + format!("{already_synced_skip_info}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); already_synced_skip_info.reset(); return true; } @@ -1072,7 +1146,11 @@ async fn pull_group( transfer_last_skip_info.update(dir.time); return false; } else if transfer_last_skip_info.count > 0 { - info!("{transfer_last_skip_info}"); + log_info_buffer( + format!("{transfer_last_skip_info}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); transfer_last_skip_info.reset(); } true @@ -1088,8 +1166,6 @@ async fn pull_group( progress.group_snapshots = list.len() as u64; } - let mut pull_stats = PullStats::default(); - for (pos, from_snapshot) in list.into_iter().enumerate() { let to_snapshot = params .target @@ -1100,12 +1176,17 @@ async fn pull_group( .source .reader(source_namespace, &from_snapshot) .await?; - let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await; + let result = + pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await; { let mut progress = progress.lock().unwrap(); progress.done_snapshots = pos as u64 + 1; - info!("percentage done: {progress}"); + log_info_buffer( + format!("percentage done: {progress}"), + buffer_logs, + &mut pull_stats.log_buffer, + ); } let stats = result?; // stop on error @@ -1124,13 +1205,21 @@ async fn pull_group( continue; } if snapshot.is_protected() { - info!( - "don't delete vanished snapshot {} (protected)", - snapshot.dir() + log_info_buffer( + format!( + "don't delete vanished snapshot {} (protected)", + snapshot.dir() + ), + buffer_logs, + &mut pull_stats.log_buffer, ); continue; } - info!("delete vanished snapshot {}", snapshot.dir()); + log_info_buffer( + format!("delete vanished snapshot {}", snapshot.dir()), + buffer_logs, + &mut pull_stats.log_buffer, + ); params .target .store @@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns( let mut pull_stats = PullStats::default(); // poll to initiate tasks, queue another remaining tasks for each finished one while let Some(result) = pull_group_tasks.next().await { - let (progress, stats, has_errors) = result?; + let (progress, mut stats, has_errors) = result?; errors |= has_errors; + // Generate log output + for log_line in stats.log_buffer.iter() { + info!("{log_line}"); + } + // clear log buffer before adding, don't need the logs anymore + stats.log_buffer.clear(); pull_stats.add(stats); store_progress.done_groups += progress.done_groups; store_progress.done_snapshots += progress.done_snapshots; @@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns( Ok((store_progress, pull_stats, errors)) } + +fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec) { + if buffer_logs { + buffer.push(msg); + } else { + info!("{msg}"); + } +} -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel