From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output
Date: Thu, 25 Jul 2024 12:19:22 +0200 [thread overview]
Message-ID: <20240725101922.231053-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>
In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
1 file changed, 134 insertions(+), 31 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
pub(crate) bytes: usize,
pub(crate) elapsed: Duration,
pub(crate) removed: Option<RemovedVanishedStats>,
+ pub(crate) log_buffer: Vec<String>,
}
impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
}
impl PullStats {
- fn add(&mut self, rhs: PullStats) {
+ fn add(&mut self, mut rhs: PullStats) {
self.chunk_count += rhs.chunk_count;
self.bytes += rhs.bytes;
self.elapsed += rhs.elapsed;
+ self.log_buffer.append(&mut rhs.log_buffer);
if let Some(rhs_removed) = rhs.removed {
if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
if let Err(err) = std::fs::rename(&tmp_path, to_path) {
bail!("Atomic rename file {:?} failed - {}", to_path, err);
}
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
}
Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst);
let chunk_count = chunk_count.load(Ordering::SeqCst);
- info!(
+ let mut log_buffer = Vec::new();
+ let msg = format!(
"downloaded {} ({}/s)",
HumanByte::from(bytes),
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
+ log_info_buffer(msg, buffer_logs, &mut log_buffer);
Ok(PullStats {
chunk_count,
bytes,
elapsed,
removed: None,
+ log_buffer,
})
}
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
let mut pull_stats = PullStats::default();
- info!("sync archive {archive_name}");
+ log_info_buffer(
+ format!("sync archive {archive_name}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
reader.load_file_into(archive_name, &tmp_path).await?;
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
- info!("no data changes");
+ log_info_buffer(
+ "no data changes".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(pull_stats); // nothing changed
}
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
}
}
- let stats =
- pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+ let stats = pull_single_archive(
+ reader.clone(),
+ snapshot,
+ item,
+ downloaded_chunks.clone(),
+ buffer_logs,
+ )
+ .await?;
pull_stats.add(stats);
}
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
snapshot
.cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- let pull_stats = if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ let mut pull_stats = PullStats::default();
+ if is_new {
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
- match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+ match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
Err(err) => {
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
- Ok(pull_stats) => {
- info!("sync snapshot {} done", snapshot.dir());
- pull_stats
+ Ok(stats) => {
+ pull_stats.add(stats);
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
} else {
- info!("re-sync snapshot {}", snapshot.dir());
- pull_snapshot(reader, snapshot, downloaded_chunks).await?
+ log_info_buffer(
+ format!("re-sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
+ let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+ pull_stats.add(stats);
};
Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
.last_successful_backup(&target_ns, group)?
.unwrap_or(i64::MIN);
+ let mut pull_stats = PullStats::default();
+ let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
let list: Vec<BackupDir> = raw_list
.into_iter()
.enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
already_synced_skip_info.update(dir.time);
return false;
} else if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_info_buffer(
+ format!("{already_synced_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
already_synced_skip_info.reset();
return true;
}
@@ -1072,7 +1146,11 @@ async fn pull_group(
transfer_last_skip_info.update(dir.time);
return false;
} else if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_info_buffer(
+ format!("{transfer_last_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
transfer_last_skip_info.reset();
}
true
@@ -1088,8 +1166,6 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
}
- let mut pull_stats = PullStats::default();
-
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
@@ -1100,12 +1176,17 @@ async fn pull_group(
.source
.reader(source_namespace, &from_snapshot)
.await?;
- let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+ let result =
+ pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
{
let mut progress = progress.lock().unwrap();
progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ log_info_buffer(
+ format!("percentage done: {progress}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
continue;
}
if snapshot.is_protected() {
- info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
+ log_info_buffer(
+ format!(
+ "don't delete vanished snapshot {} (protected)",
+ snapshot.dir()
+ ),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
);
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ log_info_buffer(
+ format!("delete vanished snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
params
.target
.store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
let mut pull_stats = PullStats::default();
// poll to initiate tasks, queue another remaining tasks for each finished one
while let Some(result) = pull_group_tasks.next().await {
- let (progress, stats, has_errors) = result?;
+ let (progress, mut stats, has_errors) = result?;
errors |= has_errors;
+ // Generate log output
+ for log_line in stats.log_buffer.iter() {
+ info!("{log_line}");
+ }
+ // clear log buffer before adding, don't need the logs anymore
+ stats.log_buffer.clear();
pull_stats.add(stats);
store_progress.done_groups += progress.done_groups;
store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
Ok((store_progress, pull_stats, errors))
}
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+ if buffer_logs {
+ buffer.push(msg);
+ } else {
+ info!("{msg}");
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-07-25 10:19 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
2024-07-30 15:56 ` Gabriel Goller
2024-07-31 7:38 ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54 ` Gabriel Goller
2024-07-31 7:35 ` Christian Ebner
2024-07-25 10:19 ` Christian Ebner [this message]
2025-01-20 10:57 ` [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240725101922.231053-5-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.