* [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Christian Ebner
` (2 subsequent siblings)
3 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
To: pbs-devel
Allow to configure from 1 up to 8 concurrent tasks to perform
multiple group syncs concurrently.
The property is exposed via the sync job config and passed to
the pull parameters for the sync job to setup and execute the tasks
accordingly.
Implements the schema definitions and includes the new property to
the `SyncJobConfig` and `PullParameters`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-api-types/src/jobs.rs | 14 ++++++++++++++
src/api2/config/sync.rs | 10 ++++++++++
src/api2/pull.rs | 13 ++++++++++---
src/server/pull.rs | 4 ++++
4 files changed, 38 insertions(+), 3 deletions(-)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 868702bc0..5e58787f7 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -64,6 +64,14 @@ pub const REMOVE_VANISHED_BACKUPS_SCHEMA: Schema = BooleanSchema::new(
.default(false)
.schema();
+const MAX_GROUP_SYNC_TASKS: usize = 8;
+pub const GROUP_SYNC_TASKS_SCHEMA: Schema =
+ IntegerSchema::new("Number of concurrent group pull tasks for sync jobs")
+ .minimum(1)
+ .maximum(MAX_GROUP_SYNC_TASKS as isize)
+ .default(1)
+ .schema();
+
#[api(
properties: {
"next-run": {
@@ -552,6 +560,10 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "group-sync-tasks": {
+ schema: GROUP_SYNC_TASKS_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -585,6 +597,8 @@ pub struct SyncJobConfig {
pub limit: RateLimitConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub transfer_last: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub group_sync_tasks: Option<usize>,
}
impl SyncJobConfig {
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 6fdc69a9e..b6cf81328 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -231,6 +231,8 @@ pub enum DeletableProperty {
MaxDepth,
/// Delete the transfer_last property,
TransferLast,
+ /// Delete the group_sync_tasks property,
+ GroupSyncTasks,
}
#[api(
@@ -331,6 +333,9 @@ pub fn update_sync_job(
DeletableProperty::TransferLast => {
data.transfer_last = None;
}
+ DeletableProperty::GroupSyncTasks => {
+ data.group_sync_tasks = None;
+ }
}
}
}
@@ -369,6 +374,10 @@ pub fn update_sync_job(
data.transfer_last = Some(transfer_last);
}
+ if let Some(group_sync_tasks) = update.group_sync_tasks {
+ data.group_sync_tasks = Some(group_sync_tasks);
+ }
+
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
}
@@ -533,6 +542,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
schedule: None,
limit: pbs_api_types::RateLimitConfig::default(), // no limit
transfer_last: None,
+ group_sync_tasks: None,
};
// should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..0756e0a51 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -8,9 +8,9 @@ use proxmox_schema::api;
use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
- GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
- PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
- TRANSFER_LAST_SCHEMA,
+ GROUP_FILTER_LIST_SCHEMA, GROUP_SYNC_TASKS_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA,
+ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
+ REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
use proxmox_human_byte::HumanByte;
@@ -89,6 +89,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
sync_job.group_filter.clone(),
sync_job.limit.clone(),
sync_job.transfer_last,
+ sync_job.group_sync_tasks,
)
}
}
@@ -240,6 +241,10 @@ pub fn do_sync_job(
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "group-sync-tasks": {
+ schema: GROUP_SYNC_TASKS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -264,6 +269,7 @@ async fn pull(
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ group_sync_tasks: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -301,6 +307,7 @@ async fn pull(
group_filter,
limit,
transfer_last,
+ group_sync_tasks,
)?;
// fixme: set to_stdout to false?
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 823515e9a..80443132e 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -499,6 +499,8 @@ pub(crate) struct PullParameters {
group_filter: Vec<GroupFilter>,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
+ /// Number of concurrent group pull tasks for sync job
+ group_sync_tasks: Option<usize>,
}
impl PullParameters {
@@ -516,6 +518,7 @@ impl PullParameters {
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ group_sync_tasks: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -560,6 +563,7 @@ impl PullParameters {
max_depth,
group_filter,
transfer_last,
+ group_sync_tasks,
})
}
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
2024-07-30 15:56 ` Gabriel Goller
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner
3 siblings, 1 reply; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
To: pbs-devel
Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-datastore/src/store_progress.rs | 2 +-
src/server/pull.rs | 102 +++++++++++++++++-----------
2 files changed, 65 insertions(+), 39 deletions(-)
diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
/// Tracker for progress of operations iterating over `Datastore` contents.
pub struct StoreProgress {
/// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
//! Sync datastore by pulling contents from remote server
use std::collections::{HashMap, HashSet};
+use std::future::Future;
use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
+use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
params: &PullParameters,
source_namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ progress: Arc<Mutex<StoreProgress>>,
) -> Result<PullStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
// start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- progress.group_snapshots = list.len() as u64;
+ {
+ let mut progress = progress.lock().unwrap();
+ progress.group_snapshots = list.len() as u64;
+ }
let mut pull_stats = PullStats::default();
@@ -1095,8 +1100,11 @@ async fn pull_group(
.await?;
let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
- progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ {
+ let mut progress = progress.lock().unwrap();
+ progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: {progress}");
+ }
let stats = result?; // stop on error
pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
Ok(pull_stats)
}
+fn pull_group_task<'future>(
+ params: &'future PullParameters,
+ group: &'future BackupGroup,
+ namespace: &'future BackupNamespace,
+ target_namespace: &'future BackupNamespace,
+ progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+ Box::pin(async move {
+ let progress = Arc::new(Mutex::new(progress));
+ let mut pull_stats = PullStats::default();
+ let mut errors = false;
+
+ let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+ target_namespace,
+ group,
+ ¶ms.owner,
+ ) {
+ Ok(result) => result,
+ Err(err) => {
+ info!("sync group {group} failed - group lock failed: {err}");
+ errors = true;
+ // do not stop here, instead continue
+ info!("create_locked_backup_group failed");
+ return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+ }
+ };
+
+ // permission check
+ if params.owner != owner {
+ // only the owner is allowed to create additional snapshots
+ info!(
+ "sync group {group} failed - owner check failed ({} != {owner})",
+ params.owner,
+ );
+ errors = true; // do not stop here, instead continue
+ } else {
+ match pull_group(params, namespace, group, progress.clone()).await {
+ Ok(stats) => pull_stats.add(stats),
+ Err(err) => {
+ info!("sync group {group} failed - {err}");
+ errors = true; // do not bail here, instead continue
+ }
+ }
+ }
+
+ let progress = progress.lock().unwrap().clone();
+ Ok((progress, pull_stats, errors))
+ })
+}
+
/// Pulls a namespace according to `params`.
///
/// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
progress.done_groups = done as u64;
progress.done_snapshots = 0;
progress.group_snapshots = 0;
-
- let (owner, _lock_guard) =
- match params
- .target
- .store
- .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
- {
- Ok(result) => result,
- Err(err) => {
- info!("sync group {} failed - group lock failed: {err}", &group);
- errors = true;
- // do not stop here, instead continue
- info!("create_locked_backup_group failed");
- continue;
- }
- };
-
- // permission check
- if params.owner != owner {
- // only the owner is allowed to create additional snapshots
- info!(
- "sync group {} failed - owner check failed ({} != {owner})",
- &group, params.owner
- );
- errors = true; // do not stop here, instead continue
- } else {
- match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => pull_stats.add(stats),
- Err(err) => {
- info!("sync group {} failed - {err}", &group);
- errors = true; // do not stop here, instead continue
- }
- }
- }
+ pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
}
if params.remove_vanished {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
` (2 preceding siblings ...)
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
@ 2024-07-25 10:19 ` Christian Ebner
3 siblings, 0 replies; 9+ messages in thread
From: Christian Ebner @ 2024-07-25 10:19 UTC (permalink / raw)
To: pbs-devel
In order to keep the log messages in a meaningful order when running
using parallel connections to sync backup groups, buffer them in the
sync stats and only display them when the corresponding task is
finished.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 165 ++++++++++++++++++++++++++++++++++++---------
1 file changed, 134 insertions(+), 31 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0a54217d4..109cd3d1c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -89,6 +89,7 @@ pub(crate) struct PullStats {
pub(crate) bytes: usize,
pub(crate) elapsed: Duration,
pub(crate) removed: Option<RemovedVanishedStats>,
+ pub(crate) log_buffer: Vec<String>,
}
impl From<RemovedVanishedStats> for PullStats {
@@ -101,10 +102,11 @@ impl From<RemovedVanishedStats> for PullStats {
}
impl PullStats {
- fn add(&mut self, rhs: PullStats) {
+ fn add(&mut self, mut rhs: PullStats) {
self.chunk_count += rhs.chunk_count;
self.bytes += rhs.bytes;
self.elapsed += rhs.elapsed;
+ self.log_buffer.append(&mut rhs.log_buffer);
if let Some(rhs_removed) = rhs.removed {
if let Some(ref mut removed) = self.removed {
@@ -443,7 +445,6 @@ impl PullReader for RemoteReader {
if let Err(err) = std::fs::rename(&tmp_path, to_path) {
bail!("Atomic rename file {:?} failed - {}", to_path, err);
}
- info!("got backup log file {CLIENT_LOG_BLOB_NAME:?}");
}
Ok(())
@@ -577,6 +578,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -658,17 +660,20 @@ async fn pull_index_chunks<I: IndexFile>(
let bytes = bytes.load(Ordering::SeqCst);
let chunk_count = chunk_count.load(Ordering::SeqCst);
- info!(
+ let mut log_buffer = Vec::new();
+ let msg = format!(
"downloaded {} ({}/s)",
HumanByte::from(bytes),
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
+ log_info_buffer(msg, buffer_logs, &mut log_buffer);
Ok(PullStats {
chunk_count,
bytes,
elapsed,
removed: None,
+ log_buffer,
})
}
@@ -702,6 +707,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
@@ -712,7 +718,11 @@ async fn pull_single_archive<'a>(
let mut pull_stats = PullStats::default();
- info!("sync archive {archive_name}");
+ log_info_buffer(
+ format!("sync archive {archive_name}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
reader.load_file_into(archive_name, &tmp_path).await?;
@@ -727,13 +737,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -747,13 +762,18 @@ async fn pull_single_archive<'a>(
verify_archive(archive_info, &csum, size)?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ log_info_buffer(
+ "skipping chunk sync for same datastore".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ buffer_logs,
)
.await?;
pull_stats.add(stats);
@@ -784,6 +804,7 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
@@ -820,8 +841,17 @@ async fn pull_snapshot<'a>(
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
- info!("no data changes");
+ log_info_buffer(
+ "no data changes".to_string(),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(pull_stats); // nothing changed
}
@@ -841,7 +871,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -851,7 +885,11 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
@@ -861,15 +899,25 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&item.filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ log_info_buffer(
+ format!("detected changed file {path:?} - {err}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
}
}
}
- let stats =
- pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
+ let stats = pull_single_archive(
+ reader.clone(),
+ snapshot,
+ item,
+ downloaded_chunks.clone(),
+ buffer_logs,
+ )
+ .await?;
pull_stats.add(stats);
}
@@ -879,6 +927,11 @@ async fn pull_snapshot<'a>(
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
+ log_info_buffer(
+ format!("got backup log file {CLIENT_LOG_BLOB_NAME:?}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
};
snapshot
.cleanup_unreferenced_files(&manifest)
@@ -895,15 +948,21 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ buffer_logs: bool,
) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- let pull_stats = if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ let mut pull_stats = PullStats::default();
+ if is_new {
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
- match pull_snapshot(reader, snapshot, downloaded_chunks).await {
+ match pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await {
Err(err) => {
if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
snapshot.backup_ns(),
@@ -914,14 +973,23 @@ async fn pull_snapshot_from<'a>(
}
return Err(err);
}
- Ok(pull_stats) => {
- info!("sync snapshot {} done", snapshot.dir());
- pull_stats
+ Ok(stats) => {
+ pull_stats.add(stats);
+ log_info_buffer(
+ format!("sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
}
} else {
- info!("re-sync snapshot {}", snapshot.dir());
- pull_snapshot(reader, snapshot, downloaded_chunks).await?
+ log_info_buffer(
+ format!("re-sync snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
+ let stats = pull_snapshot(reader, snapshot, downloaded_chunks, buffer_logs).await?;
+ pull_stats.add(stats);
};
Ok(pull_stats)
@@ -1054,6 +1122,8 @@ async fn pull_group(
.last_successful_backup(&target_ns, group)?
.unwrap_or(i64::MIN);
+ let mut pull_stats = PullStats::default();
+ let buffer_logs = matches!(params.group_sync_tasks, Some(n) if n > 1);
let list: Vec<BackupDir> = raw_list
.into_iter()
.enumerate()
@@ -1063,7 +1133,11 @@ async fn pull_group(
already_synced_skip_info.update(dir.time);
return false;
} else if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ log_info_buffer(
+ format!("{already_synced_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
already_synced_skip_info.reset();
return true;
}
@@ -1072,7 +1146,11 @@ async fn pull_group(
transfer_last_skip_info.update(dir.time);
return false;
} else if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ log_info_buffer(
+ format!("{transfer_last_skip_info}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
transfer_last_skip_info.reset();
}
true
@@ -1088,8 +1166,6 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
}
- let mut pull_stats = PullStats::default();
-
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
@@ -1100,12 +1176,17 @@ async fn pull_group(
.source
.reader(source_namespace, &from_snapshot)
.await?;
- let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
+ let result =
+ pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), buffer_logs).await;
{
let mut progress = progress.lock().unwrap();
progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ log_info_buffer(
+ format!("percentage done: {progress}"),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
}
let stats = result?; // stop on error
@@ -1124,13 +1205,21 @@ async fn pull_group(
continue;
}
if snapshot.is_protected() {
- info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
+ log_info_buffer(
+ format!(
+ "don't delete vanished snapshot {} (protected)",
+ snapshot.dir()
+ ),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
);
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ log_info_buffer(
+ format!("delete vanished snapshot {}", snapshot.dir()),
+ buffer_logs,
+ &mut pull_stats.log_buffer,
+ );
params
.target
.store
@@ -1478,8 +1567,14 @@ pub(crate) async fn pull_ns(
let mut pull_stats = PullStats::default();
// poll to initiate tasks, queue another remaining tasks for each finished one
while let Some(result) = pull_group_tasks.next().await {
- let (progress, stats, has_errors) = result?;
+ let (progress, mut stats, has_errors) = result?;
errors |= has_errors;
+ // Generate log output
+ for log_line in stats.log_buffer.iter() {
+ info!("{log_line}");
+ }
+ // clear log buffer before adding, don't need the logs anymore
+ stats.log_buffer.clear();
pull_stats.add(stats);
store_progress.done_groups += progress.done_groups;
store_progress.done_snapshots += progress.done_snapshots;
@@ -1552,3 +1647,11 @@ pub(crate) async fn pull_ns(
Ok((store_progress, pull_stats, errors))
}
+
+fn log_info_buffer(msg: String, buffer_logs: bool, buffer: &mut Vec<String>) {
+ if buffer_logs {
+ buffer.push(msg);
+ } else {
+ info!("{msg}");
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 9+ messages in thread