* [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs
@ 2025-04-04 13:49 Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
` (7 more replies)
0 siblings, 8 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Syncing contents from/to a remote source via a sync job suffers from
low throughput on high latency networks because of limitations by the
HTTP/2 connection, as described in [0]. To improve, syncing multiple
groups in parallel by establishing multiple reader instances has been
suggested.
This patch series implements the functionality by adding the sync job
configuration property `parallel-groups`, allowing to define the
number of concurrent groups pull/push futures to be instantiated and
executed for each job.
The property is currently not exposed on the UI, as intended to be
set in the config directly for now.
Examplary configuration:
```
sync: s-8764c440-3a6c
ns
owner root@pam
remote local
remote-ns
remote-store push-target-store
remove-vanished false
store datastore
sync-direction push
parallel-groups 4
```
Since log messages are now also written concurrently, prefix logs
related to groups, snapshots and archives with their respective
context prefix and add context to error messages.
Further, improve logging especially for sync jobs in push direction,
which only displayed limited information so far.
[0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182
proxmox:
Christian Ebner (1):
pbs api types: add 'parallel-groups' to sync job config
pbs-api-types/src/jobs.rs | 14 ++++++++++++++
1 file changed, 14 insertions(+)
proxmox-backup:
Christian Ebner (6):
client: backup writer: fix upload stats size and rate for push sync
api: config/sync: add optional `parallel-groups` property
fix #4182: server: sync: allow pulling groups concurrently
server: pull: prefix log messages and add error context
server: sync: allow pushing groups concurrently
server: push: prefix log messages and add additional logging
pbs-client/src/backup_stats.rs | 20 +--
pbs-client/src/backup_writer.rs | 4 +-
pbs-datastore/src/store_progress.rs | 2 +-
src/api2/config/sync.rs | 10 ++
src/api2/pull.rs | 9 +-
src/api2/push.rs | 9 +-
src/server/pull.rs | 209 +++++++++++++++++-----------
src/server/push.rs | 147 ++++++++++++++-----
src/server/sync.rs | 10 +-
9 files changed, 287 insertions(+), 133 deletions(-)
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
` (6 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Allow to specify the number of concurrent groups to be synchornized
by the sync job. Values can range from 1 to 8.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
pbs-api-types/src/jobs.rs | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index d0b94a24..b0d8f025 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -64,6 +64,14 @@ pub const REMOVE_VANISHED_BACKUPS_SCHEMA: Schema = BooleanSchema::new(
.default(false)
.schema();
+const SYNC_PARALLEL_GROUPS_MAX: usize = 8;
+pub const SYNC_PARALLEL_GROUPS_SCHEMA: Schema =
+ IntegerSchema::new("Maximum number of groups to synchronzie in parallel for a sync jobs")
+ .minimum(1)
+ .maximum(SYNC_PARALLEL_GROUPS_MAX as isize)
+ .default(1)
+ .schema();
+
#[api(
properties: {
"next-run": {
@@ -595,6 +603,10 @@ pub const RESYNC_CORRUPT_SCHEMA: Schema =
type: SyncDirection,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -632,6 +644,8 @@ pub struct SyncJobConfig {
pub resync_corrupt: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_direction: Option<SyncDirection>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub parallel_groups: Option<usize>,
}
impl SyncJobConfig {
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 18:01 ` Max Carrara
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
` (5 subsequent siblings)
7 siblings, 1 reply; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Currently, the logical size of the uploaded chunks is used for size
and upload rate calculation in case of sync jobs in push direction,
leading to incorrect values of the actual transferred size and rate.
Use the compressed chunk size instead, by returning the more verbose
`UploadStats` on `upload_index_chunk_info` calls and use it's
compressed size for the transferred `bytes` of `SyncStats` instead,
being finally used to display the upload size and calculate the rate
for the push sync job.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
pbs-client/src/backup_stats.rs | 20 ++++++++++----------
pbs-client/src/backup_writer.rs | 4 ++--
src/server/push.rs | 2 +-
3 files changed, 13 insertions(+), 13 deletions(-)
diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index f0563a001..edf7ef3c4 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -15,16 +15,16 @@ pub struct BackupStats {
}
/// Extended backup run statistics and archive checksum
-pub(crate) struct UploadStats {
- pub(crate) chunk_count: usize,
- pub(crate) chunk_reused: usize,
- pub(crate) chunk_injected: usize,
- pub(crate) size: usize,
- pub(crate) size_reused: usize,
- pub(crate) size_injected: usize,
- pub(crate) size_compressed: usize,
- pub(crate) duration: Duration,
- pub(crate) csum: [u8; 32],
+pub struct UploadStats {
+ pub chunk_count: usize,
+ pub chunk_reused: usize,
+ pub chunk_injected: usize,
+ pub size: usize,
+ pub size_reused: usize,
+ pub size_injected: usize,
+ pub size_compressed: usize,
+ pub duration: Duration,
+ pub csum: [u8; 32],
}
impl UploadStats {
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 325425069..d93b3e7e8 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -275,7 +275,7 @@ impl BackupWriter {
archive_name: &BackupArchiveName,
stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
options: UploadOptions,
- ) -> Result<BackupStats, Error> {
+ ) -> Result<UploadStats, Error> {
let mut param = json!({ "archive-name": archive_name });
let prefix = if let Some(size) = options.fixed_size {
param["size"] = size.into();
@@ -359,7 +359,7 @@ impl BackupWriter {
.post(&format!("{prefix}_close"), Some(param))
.await?;
- Ok(upload_stats.to_backup_stats())
+ Ok(upload_stats)
}
pub async fn upload_stream(
diff --git a/src/server/push.rs b/src/server/push.rs
index 0db3dff30..f00a6b1e0 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -1016,7 +1016,7 @@ async fn push_index(
Ok(SyncStats {
chunk_count: upload_stats.chunk_count as usize,
- bytes: upload_stats.size as usize,
+ bytes: upload_stats.size_compressed as usize,
elapsed: upload_stats.duration,
removed: None,
})
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
` (4 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Allow to configure from 1 up to 8 concurrent futures to perform
multiple group syncs in parallel.
The property is exposed via the sync job config and passed to
the pull/push parameters for the sync job to setup and execute the
futures accordingly.
Implements the schema definitions and includes the new property to
the `SyncJobConfig`, `PullParameters` and `PushParameters`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
src/api2/config/sync.rs | 10 ++++++++++
src/api2/pull.rs | 9 ++++++++-
src/api2/push.rs | 9 ++++++++-
src/server/pull.rs | 4 ++++
src/server/push.rs | 4 ++++
src/server/sync.rs | 1 +
6 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index a8ea93465..517bcd868 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -337,6 +337,8 @@ pub enum DeletableProperty {
TransferLast,
/// Delete the sync_direction property,
SyncDirection,
+ /// Delete the parallel_groups property,
+ ParallelGroups,
}
#[api(
@@ -451,6 +453,9 @@ pub fn update_sync_job(
DeletableProperty::SyncDirection => {
data.sync_direction = None;
}
+ DeletableProperty::ParallelGroups => {
+ data.parallel_groups = None;
+ }
}
}
}
@@ -495,6 +500,10 @@ pub fn update_sync_job(
data.sync_direction = Some(sync_direction);
}
+ if let Some(parallel_groups) = update.parallel_groups {
+ data.parallel_groups = Some(parallel_groups);
+ }
+
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
}
@@ -666,6 +675,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
limit: pbs_api_types::RateLimitConfig::default(), // no limit
transfer_last: None,
sync_direction: None, // use default
+ parallel_groups: None,
};
// should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index d8ed1a734..36f66d9b0 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -10,7 +10,7 @@ use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
- RESYNC_CORRUPT_SCHEMA, TRANSFER_LAST_SCHEMA,
+ RESYNC_CORRUPT_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
use proxmox_rest_server::WorkerTask;
@@ -88,6 +88,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
sync_job.limit.clone(),
sync_job.transfer_last,
sync_job.resync_corrupt,
+ sync_job.parallel_groups,
)
}
}
@@ -137,6 +138,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
schema: RESYNC_CORRUPT_SCHEMA,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -162,6 +167,7 @@ async fn pull(
limit: RateLimitConfig,
transfer_last: Option<usize>,
resync_corrupt: Option<bool>,
+ parallel_groups: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -200,6 +206,7 @@ async fn pull(
limit,
transfer_last,
resync_corrupt,
+ parallel_groups,
)?;
// fixme: set to_stdout to false?
diff --git a/src/api2/push.rs b/src/api2/push.rs
index bf846bb37..4bb91ed61 100644
--- a/src/api2/push.rs
+++ b/src/api2/push.rs
@@ -5,7 +5,8 @@ use pbs_api_types::{
Authid, BackupNamespace, GroupFilter, RateLimitConfig, DATASTORE_SCHEMA,
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_PRUNE,
- REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA,
+ REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, SYNC_PARALLEL_GROUPS_SCHEMA,
+ TRANSFER_LAST_SCHEMA,
};
use proxmox_rest_server::WorkerTask;
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -99,6 +100,10 @@ fn check_push_privs(
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "parallel-groups": {
+ schema: SYNC_PARALLEL_GROUPS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -122,6 +127,7 @@ async fn push(
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ parallel_groups: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -151,6 +157,7 @@ async fn push(
group_filter,
limit,
transfer_last,
+ parallel_groups,
)
.await?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 2c0ad9e1e..46c3d8dc5 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -57,6 +57,8 @@ pub(crate) struct PullParameters {
transfer_last: Option<usize>,
/// Whether to re-sync corrupted snapshots
resync_corrupt: bool,
+ /// Maximum number of parallel groups to pull during sync job
+ parallel_groups: Option<usize>,
}
impl PullParameters {
@@ -75,6 +77,7 @@ impl PullParameters {
limit: RateLimitConfig,
transfer_last: Option<usize>,
resync_corrupt: Option<bool>,
+ parallel_groups: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -121,6 +124,7 @@ impl PullParameters {
group_filter,
transfer_last,
resync_corrupt,
+ parallel_groups,
})
}
}
diff --git a/src/server/push.rs b/src/server/push.rs
index f00a6b1e0..878b0f6d6 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -75,6 +75,8 @@ pub(crate) struct PushParameters {
group_filter: Vec<GroupFilter>,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
+ /// Maximum number of parallel groups to push during sync job
+ parallel_groups: Option<usize>,
}
impl PushParameters {
@@ -92,6 +94,7 @@ impl PushParameters {
group_filter: Option<Vec<GroupFilter>>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ parallel_groups: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -150,6 +153,7 @@ impl PushParameters {
max_depth,
group_filter,
transfer_last,
+ parallel_groups,
})
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 10804b147..e5afbea23 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -673,6 +673,7 @@ pub fn do_sync_job(
sync_job.group_filter.clone(),
sync_job.limit.clone(),
sync_job.transfer_last,
+ sync_job.parallel_groups,
)
.await?;
push_store(push_params).await?
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (2 preceding siblings ...)
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 18:02 ` Max Carrara
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
` (3 subsequent siblings)
7 siblings, 1 reply; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.
Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.
Make the error handling and accounting logic for each group pull
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind an atomic reference counted mutex
to allow for concurrent access of status updates.
Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
pbs-datastore/src/store_progress.rs | 2 +-
src/server/pull.rs | 111 ++++++++++++++++++----------
2 files changed, 72 insertions(+), 41 deletions(-)
diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
/// Tracker for progress of operations iterating over `Datastore` contents.
pub struct StoreProgress {
/// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 46c3d8dc5..a484957ce 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
use proxmox_human_byte::HumanByte;
use tracing::info;
@@ -512,7 +514,7 @@ async fn pull_group(
params: &PullParameters,
source_namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -601,7 +603,8 @@ async fn pull_group(
// start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- progress.group_snapshots = list.len() as u64;
+ let mut local_progress = store_progress.lock().unwrap().clone();
+ local_progress.group_snapshots = list.len() as u64;
let mut sync_stats = SyncStats::default();
@@ -618,8 +621,11 @@ async fn pull_group(
let result =
pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
- progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ store_progress.lock().unwrap().done_snapshots += 1;
+ // Update done groups progress by other parallel running pulls
+ local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("Percentage done: {local_progress}");
let stats = result?; // stop on error
sync_stats.add(stats);
@@ -863,6 +869,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
Ok(sync_stats)
}
+async fn pull_group_do(
+ params: &PullParameters,
+ group: &BackupGroup,
+ namespace: &BackupNamespace,
+ target_namespace: &BackupNamespace,
+ progress: Arc<Mutex<StoreProgress>>,
+) -> Result<SyncStats, ()> {
+ let (owner, _lock_guard) =
+ match params
+ .target
+ .store
+ .create_locked_backup_group(target_namespace, group, ¶ms.owner)
+ {
+ Ok(result) => result,
+ Err(err) => {
+ info!("sync group {group} failed - group lock failed: {err}");
+ info!("create_locked_backup_group failed");
+ return Err(());
+ }
+ };
+
+ if params.owner != owner {
+ // only the owner is allowed to create additional snapshots
+ info!(
+ "sync group {group} failed - owner check failed ({} != {owner})",
+ params.owner,
+ );
+ return Err(());
+ }
+
+ match pull_group(params, namespace, group, progress.clone()).await {
+ Ok(sync_stats) => {
+ progress.lock().unwrap().done_groups += 1;
+ Ok(sync_stats)
+ }
+ Err(err) => {
+ info!("sync group {group} failed - {err}");
+ Err(())
+ }
+ }
+}
+
/// Pulls a namespace according to `params`.
///
/// Pulling a namespace consists of the following steps:
@@ -901,48 +949,29 @@ pub(crate) async fn pull_ns(
new_groups.insert(group.clone());
}
- let mut progress = StoreProgress::new(list.len() as u64);
+ let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
let mut sync_stats = SyncStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let mut puller = FuturesUnordered::new();
+ let mut group_futures_iter = list
+ .iter()
+ .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
- let (owner, _lock_guard) =
- match params
- .target
- .store
- .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
- {
- Ok(result) => result,
- Err(err) => {
- info!("sync group {} failed - group lock failed: {err}", &group);
- errors = true;
- // do not stop here, instead continue
- info!("create_locked_backup_group failed");
- continue;
- }
- };
+ for _ in 0..params.parallel_groups.unwrap_or(1) {
+ if let Some(future) = group_futures_iter.next() {
+ puller.push(future);
+ }
+ }
- // permission check
- if params.owner != owner {
- // only the owner is allowed to create additional snapshots
- info!(
- "sync group {} failed - owner check failed ({} != {owner})",
- &group, params.owner
- );
- errors = true; // do not stop here, instead continue
- } else {
- match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => sync_stats.add(stats),
- Err(err) => {
- info!("sync group {} failed - {err}", &group);
- errors = true; // do not stop here, instead continue
- }
- }
+ while let Some(result) = puller.next().await {
+ match result {
+ Ok(stats) => sync_stats.add(stats),
+ Err(()) => errors |= true,
+ };
+ if let Some(future) = group_futures_iter.next() {
+ puller.push(future);
}
}
@@ -998,5 +1027,7 @@ pub(crate) async fn pull_ns(
};
}
+ let progress = progress.lock().unwrap().clone();
+
Ok((progress, sync_stats, errors))
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (3 preceding siblings ...)
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently Christian Ebner
` (2 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Pulling groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix pull job log messages by the corresponding group or
snapshot and set the error context accordingly.
Also, reword some messages, inline variables in format strings and
start log lines with capital letters to get consistent output.
Example output for a sequential pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (111.223 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (19.884 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...
```
Example output for a parallel pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot vm/200/2025-01-15T12:30:06Z: start sync
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive qemu-server.conf.blob
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive drive-scsi0.img.fidx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (206.124 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (1.972 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
src/server/pull.rs | 102 ++++++++++++++++++++++++++-------------------
src/server/sync.rs | 9 ++--
2 files changed, 65 insertions(+), 46 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index a484957ce..57c3d3566 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use proxmox_human_byte::HumanByte;
@@ -136,6 +136,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
+ prefix: &str,
) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -218,7 +219,7 @@ async fn pull_index_chunks<I: IndexFile>(
let chunk_count = chunk_count.load(Ordering::SeqCst);
info!(
- "downloaded {} ({}/s)",
+ "{prefix}: downloaded {} ({}/s)",
HumanByte::from(bytes),
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
@@ -262,6 +263,8 @@ async fn pull_single_archive<'a>(
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {}", snapshot.dir());
+
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -271,28 +274,36 @@ async fn pull_single_archive<'a>(
let mut sync_stats = SyncStats::default();
- info!("sync archive {archive_name}");
+ info!("{prefix}: sync archive {archive_name}");
+
+ let prefix = format!("Snapshot {}: archive {archive_name}", snapshot.dir());
reader.load_file_into(archive_name, &tmp_path).await?;
- let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .read(true)
+ .open(&tmp_path)
+ .context(format!("archive {archive_name}"))?;
match ArchiveType::from_path(archive_name)? {
ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+ format_err!(
+ "archive {archive_name}: unable to read dynamic index {tmp_path:?} - {err}"
+ )
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ info!("{prefix}: skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ &prefix,
)
.await?;
sync_stats.add(stats);
@@ -300,19 +311,22 @@ async fn pull_single_archive<'a>(
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+ format_err!(
+ "archive {archive_name}: unable to read fixed index '{tmp_path:?}' - {err}"
+ )
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ info!("{prefix}: skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
index,
downloaded_chunks,
+ &prefix,
)
.await?;
sync_stats.add(stats);
@@ -321,11 +335,11 @@ async fn pull_single_archive<'a>(
ArchiveType::Blob => {
tmpfile.rewind()?;
let (csum, size) = sha256(&mut tmpfile)?;
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(prefix.clone())?;
}
}
if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
+ bail!("archive {archive_name}: Atomic rename file {path:?} failed - {err}");
}
Ok(sync_stats)
}
@@ -347,13 +361,14 @@ async fn pull_snapshot<'a>(
is_new: bool,
) -> Result<SyncStats, Error> {
if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ info!("{}: start sync", snapshot.dir());
} else if corrupt {
info!("re-sync snapshot {} due to corruption", snapshot.dir());
} else {
info!("re-sync snapshot {}", snapshot.dir());
}
+ let prefix = format!("Snapshot {}", snapshot.dir());
let mut sync_stats = SyncStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME.as_ref());
@@ -366,7 +381,8 @@ async fn pull_snapshot<'a>(
let tmp_manifest_blob;
if let Some(data) = reader
.load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name)
- .await?
+ .await
+ .context(prefix.clone())?
{
tmp_manifest_blob = data;
} else {
@@ -376,21 +392,21 @@ async fn pull_snapshot<'a>(
if manifest_name.exists() && !corrupt {
let manifest_blob = proxmox_lang::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
- format_err!("unable to open local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to open local manifest {manifest_name:?} - {err}")
})?;
let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
Ok(manifest_blob)
})
.map_err(|err: Error| {
- format_err!("unable to read local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to read local manifest {manifest_name:?} - {err}")
})?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
};
- info!("no data changes");
+ info!("{prefix}: no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(sync_stats); // nothing changed
}
@@ -411,7 +427,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -421,7 +437,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -431,7 +447,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -444,7 +460,7 @@ async fn pull_snapshot<'a>(
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
- bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+ bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}");
}
if !client_log_name.exists() {
@@ -452,7 +468,7 @@ async fn pull_snapshot<'a>(
};
snapshot
.cleanup_unreferenced_files(&manifest)
- .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
+ .map_err(|err| format_err!("{prefix}: failed to cleanup unreferenced files - {err}"))?;
Ok(sync_stats)
}
@@ -467,9 +483,12 @@ async fn pull_snapshot_from<'a>(
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt: bool,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {}", snapshot.dir());
+
let (_path, is_new, _snap_lock) = snapshot
.datastore()
- .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+ .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())
+ .context(prefix.clone())?;
let result = pull_snapshot(reader, snapshot, downloaded_chunks, corrupt, is_new).await;
@@ -482,11 +501,11 @@ async fn pull_snapshot_from<'a>(
snapshot.as_ref(),
true,
) {
- info!("cleanup error - {cleanup_err}");
+ info!("{prefix}: cleanup error - {cleanup_err}");
}
return Err(err);
}
- Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
+ Ok(_) => info!("{prefix}: sync done"),
}
}
@@ -516,6 +535,7 @@ async fn pull_group(
group: &BackupGroup,
store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Group {group}");
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -523,6 +543,7 @@ async fn pull_group(
.source
.list_backup_dirs(source_namespace, group)
.await?;
+
raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
let total_amount = raw_list.len();
@@ -592,11 +613,11 @@ async fn pull_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ info!("{prefix}: {already_synced_skip_info}");
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ info!("{prefix}: {transfer_last_skip_info}");
transfer_last_skip_info.reset();
}
@@ -644,12 +665,12 @@ async fn pull_group(
}
if snapshot.is_protected() {
info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
+ "{prefix}: don't delete vanished snapshot {} (protected)",
+ snapshot.dir(),
);
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ info!("{prefix}: delete vanished snapshot {}", snapshot.dir());
params
.target
.store
@@ -848,10 +869,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
}
Err(err) => {
errors = true;
- info!(
- "Encountered errors while syncing namespace {} - {err}",
- &namespace,
- );
+ info!("Encountered errors while syncing namespace {namespace} - {err}");
}
};
}
@@ -876,6 +894,7 @@ async fn pull_group_do(
target_namespace: &BackupNamespace,
progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, ()> {
+ let prefix = format!("Group {group}");
let (owner, _lock_guard) =
match params
.target
@@ -884,8 +903,7 @@ async fn pull_group_do(
{
Ok(result) => result,
Err(err) => {
- info!("sync group {group} failed - group lock failed: {err}");
- info!("create_locked_backup_group failed");
+ info!("{prefix}: creating locked backup group failed: {err}");
return Err(());
}
};
@@ -893,7 +911,7 @@ async fn pull_group_do(
if params.owner != owner {
// only the owner is allowed to create additional snapshots
info!(
- "sync group {group} failed - owner check failed ({} != {owner})",
+ "{prefix}: owner check failed: ({} != {owner})",
params.owner,
);
return Err(());
@@ -905,7 +923,7 @@ async fn pull_group_do(
Ok(sync_stats)
}
Err(err) => {
- info!("sync group {group} failed - {err}");
+ info!("{prefix}: pulling group failed: {err:#}");
Err(())
}
}
@@ -938,7 +956,7 @@ pub(crate) async fn pull_ns(
list.sort_unstable();
info!(
- "found {} groups to sync (out of {unfiltered_count} total)",
+ "Found {} groups to sync (out of {unfiltered_count} total)",
list.len()
);
@@ -990,7 +1008,7 @@ pub(crate) async fn pull_ns(
if !local_group.apply_filters(¶ms.group_filter) {
continue;
}
- info!("delete vanished group '{local_group}'");
+ info!("Delete vanished group '{local_group}'");
let delete_stats_result = params
.target
.store
@@ -999,7 +1017,7 @@ pub(crate) async fn pull_ns(
match delete_stats_result {
Ok(stats) => {
if !stats.all_removed() {
- info!("kept some protected snapshots of group '{local_group}'");
+ info!("Kept some protected snapshots of group '{local_group}'");
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
@@ -1022,7 +1040,7 @@ pub(crate) async fn pull_ns(
Ok(())
});
if let Err(err) = result {
- info!("error during cleanup: {err}");
+ info!("Error during cleanup: {err}");
errors = true;
};
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index e5afbea23..66f2edf4e 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -135,13 +135,13 @@ impl SyncSourceReader for RemoteSourceReader {
Some(HttpError { code, message }) => match *code {
StatusCode::NOT_FOUND => {
info!(
- "skipping snapshot {} - vanished since start of sync",
+ "Snapshot {}: skipped because vanished since start of sync",
&self.dir
);
return Ok(None);
}
_ => {
- bail!("HTTP error {code} - {message}");
+ bail!("Snapshot {}: HTTP error {code} - {message}", &self.dir);
}
},
None => {
@@ -175,7 +175,8 @@ impl SyncSourceReader for RemoteSourceReader {
bail!("Atomic rename file {to_path:?} failed - {err}");
}
info!(
- "got backup log file {client_log_name}",
+ "Snapshot {snapshot}: got backup log file {client_log_name}",
+ snapshot = &self.dir,
client_log_name = client_log_name.deref()
);
}
@@ -383,7 +384,7 @@ impl SyncSource for RemoteSource {
let snapshot = item.backup;
// in-progress backups can't be synced
if item.size.is_none() {
- info!("skipping snapshot {snapshot} - in-progress backup");
+ info!("Snapshot {snapshot}: skipped because backup in progress");
return None;
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (4 preceding siblings ...)
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
7 siblings, 0 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
src/server/push.rs | 94 +++++++++++++++++++++++++++++++++-------------
1 file changed, 68 insertions(+), 26 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 878b0f6d6..4188955f5 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -4,7 +4,7 @@ use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Context, Error};
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, warn};
@@ -535,41 +535,46 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
- let mut progress = StoreProgress::new(list.len() as u64);
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
+ let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(params, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
+
+ let mut pusher = FuturesUnordered::new();
+ let mut group_futures_iter = list.iter().map(|group| {
+ push_group_do(
+ params,
+ namespace,
+ group,
+ progress.clone(),
+ synced_groups.clone(),
+ not_owned_target_groups.clone(),
+ )
+ });
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
-
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
+ for _ in 0..params.parallel_groups.unwrap_or(1) {
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
- synced_groups.insert(group.clone());
+ }
- match push_group(params, namespace, &group, &mut progress).await {
+ while let Some(result) = pusher.next().await {
+ match result {
Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
- }
+ Err(()) => errors |= true,
+ };
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
}
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -601,6 +606,8 @@ pub(crate) async fn push_namespace(
}
}
+ let progress = progress.lock().unwrap().clone();
+
Ok((progress, stats, errors))
}
@@ -648,6 +655,37 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ progress: Arc<Mutex<StoreProgress>>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(&group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ progress.lock().unwrap().done_groups += 1;
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ match push_group(params, namespace, group, progress.clone()).await {
+ Ok(sync_stats) => {
+ progress.lock().unwrap().done_groups += 1;
+ Ok(sync_stats)
+ }
+ Err(err) => {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ Err(())
+ }
+ }
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -661,7 +699,7 @@ pub(crate) async fn push_group(
params: &PushParameters,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -716,7 +754,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = store_progress.lock().unwrap().clone();
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -725,8 +764,11 @@ pub(crate) async fn push_group(
push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ store_progress.lock().unwrap().done_snapshots += 1;
+ local_progress.done_snapshots = pos as u64 + 1;
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+ info!("Percentage done: {local_progress}");
// stop on error
let sync_stats = result?;
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (5 preceding siblings ...)
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently Christian Ebner
@ 2025-04-04 13:49 ` Christian Ebner
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
7 siblings, 0 replies; 14+ messages in thread
From: Christian Ebner @ 2025-04-04 13:49 UTC (permalink / raw)
To: pbs-devel
Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix push job log messages by the corresponding group or
snapshot.
Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
src/server/push.rs | 49 ++++++++++++++++++++++++++++++++++++----------
1 file changed, 39 insertions(+), 10 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 4188955f5..17fc411fe 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -25,6 +25,8 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
+use proxmox_human_byte::HumanByte;
+
use super::sync::{
check_namespace_depth_limit, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
SyncSource, SyncStats,
@@ -701,6 +703,7 @@ pub(crate) async fn push_group(
group: &BackupGroup,
store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Group {group}");
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -746,11 +749,11 @@ pub(crate) async fn push_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ info!("{prefix}: {already_synced_skip_info}");
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ info!("{prefix}: {transfer_last_skip_info}");
transfer_last_skip_info.reset();
}
@@ -760,6 +763,7 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ info!("Snapshot {source_snapshot}: start sync");
let result =
push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
fetch_previous_manifest = true;
@@ -768,10 +772,11 @@ pub(crate) async fn push_group(
local_progress.done_snapshots = pos as u64 + 1;
// Update done groups progress by other parallel running pushes
local_progress.done_groups = store_progress.lock().unwrap().done_groups;
- info!("Percentage done: {local_progress}");
// stop on error
let sync_stats = result?;
+ info!("Snapshot {source_snapshot}: sync done");
+ info!("Percentage done: {local_progress}");
stats.add(sync_stats);
}
@@ -782,7 +787,7 @@ pub(crate) async fn push_group(
}
if snapshot.protected {
info!(
- "Kept protected snapshot {name} on remote",
+ "{prefix}: Kept protected snapshot {name} on remote",
name = snapshot.backup
);
continue;
@@ -790,14 +795,14 @@ pub(crate) async fn push_group(
match forget_target_snapshot(params, &target_namespace, &snapshot.backup).await {
Ok(()) => {
info!(
- "Removed vanished snapshot {name} from remote",
+ "{prefix}: Removed vanished snapshot {name} from remote",
name = snapshot.backup
);
}
Err(err) => {
- warn!("Encountered errors: {err:#}");
+ warn!("{prefix}: Encountered errors: {err:#}");
warn!(
- "Failed to remove vanished snapshot {name} from remote!",
+ "{prefix}: Failed to remove vanished snapshot {name} from remote!",
name = snapshot.backup
);
}
@@ -825,6 +830,7 @@ pub(crate) async fn push_snapshot(
snapshot: &BackupDir,
fetch_previous_manifest: bool,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {snapshot}");
let mut stats = SyncStats::default();
let target_ns = params.map_to_target(namespace)?;
let backup_dir = params
@@ -840,8 +846,8 @@ pub(crate) async fn push_snapshot(
Ok((manifest, _raw_size)) => manifest,
Err(err) => {
// No manifest in snapshot or failed to read, warn and skip
- log::warn!("Encountered errors: {err:#}");
- log::warn!("Failed to load manifest for '{snapshot}'!");
+ warn!("{prefix}: Encountered errors: {err:#}");
+ warn!("{prefix}: Failed to load manifest!");
return Ok(stats);
}
};
@@ -863,7 +869,7 @@ pub(crate) async fn push_snapshot(
if fetch_previous_manifest {
match backup_writer.download_previous_manifest().await {
Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
- Err(err) => log::info!("Could not download previous manifest - {err}"),
+ Err(err) => info!("{prefix}: Could not download previous manifest - {err}"),
}
};
@@ -892,12 +898,21 @@ pub(crate) async fn push_snapshot(
path.push(&entry.filename);
if path.try_exists()? {
let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+ info!("{prefix}: sync archive {archive_name}");
+ let prefix = format!("Snapshot {snapshot}: archive {archive_name}");
match archive_name.archive_type() {
ArchiveType::Blob => {
let file = std::fs::File::open(&path)?;
let backup_stats = backup_writer
.upload_blob(file, archive_name.as_ref())
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(backup_stats.size),
+ HumanByte::new_binary(
+ backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+ ),
+ );
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
bytes: backup_stats.size as usize,
@@ -927,6 +942,13 @@ pub(crate) async fn push_snapshot(
known_chunks.clone(),
)
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ );
stats.add(sync_stats);
}
ArchiveType::FixedIndex => {
@@ -952,6 +974,13 @@ pub(crate) async fn push_snapshot(
known_chunks.clone(),
)
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ );
stats.add(sync_stats);
}
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (6 preceding siblings ...)
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
@ 2025-04-04 18:01 ` Max Carrara
2025-04-05 9:31 ` Christian Ebner
7 siblings, 1 reply; 14+ messages in thread
From: Max Carrara @ 2025-04-04 18:01 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
> Syncing contents from/to a remote source via a sync job suffers from
> low throughput on high latency networks because of limitations by the
> HTTP/2 connection, as described in [0]. To improve, syncing multiple
> groups in parallel by establishing multiple reader instances has been
> suggested.
>
> This patch series implements the functionality by adding the sync job
> configuration property `parallel-groups`, allowing to define the
> number of concurrent groups pull/push futures to be instantiated and
> executed for each job.
> The property is currently not exposed on the UI, as intended to be
> set in the config directly for now.
>
> Examplary configuration:
> ```
> sync: s-8764c440-3a6c
> ns
> owner root@pam
> remote local
> remote-ns
> remote-store push-target-store
> remove-vanished false
> store datastore
> sync-direction push
> parallel-groups 4
> ```
>
> Since log messages are now also written concurrently, prefix logs
> related to groups, snapshots and archives with their respective
> context prefix and add context to error messages.
>
> Further, improve logging especially for sync jobs in push direction,
> which only displayed limited information so far.
>
> [0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182
So, I've given the code a good look -- unfortunately it's too late to do
any additional testing, but I wanted to shoot this out regardless in the
meantime.
Code Review
===========
As always, the code quality is pristine -- I like that you're factoring
things out into little helper functions where applicable instead of
letting the existing methods grow. Very nice. Also applies cleanly
and is formatted correctly, naturally. Really can't complain, the
changes are straightforward and easy to follow.
There's only a couple little things I spotted; see my comments inline.
Regarding that large comment about mutexes and atomics: That's something
I just wanted to mention, so just to make it clear, you don't need to
apply my suggestion :P It's probably something we should have a look at
tree-wide for other data structures, too.
Splendid work as always, anyhow!
For now, until I get to test this, consider:
Reviewed-by: Max Carrara <m.carrara@proxmox.com>
>
> proxmox:
>
> Christian Ebner (1):
> pbs api types: add 'parallel-groups' to sync job config
>
> pbs-api-types/src/jobs.rs | 14 ++++++++++++++
> 1 file changed, 14 insertions(+)
>
> proxmox-backup:
>
> Christian Ebner (6):
> client: backup writer: fix upload stats size and rate for push sync
> api: config/sync: add optional `parallel-groups` property
> fix #4182: server: sync: allow pulling groups concurrently
> server: pull: prefix log messages and add error context
> server: sync: allow pushing groups concurrently
> server: push: prefix log messages and add additional logging
>
> pbs-client/src/backup_stats.rs | 20 +--
> pbs-client/src/backup_writer.rs | 4 +-
> pbs-datastore/src/store_progress.rs | 2 +-
> src/api2/config/sync.rs | 10 ++
> src/api2/pull.rs | 9 +-
> src/api2/push.rs | 9 +-
> src/server/pull.rs | 209 +++++++++++++++++-----------
> src/server/push.rs | 147 ++++++++++++++-----
> src/server/sync.rs | 10 +-
> 9 files changed, 287 insertions(+), 133 deletions(-)
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
@ 2025-04-04 18:01 ` Max Carrara
0 siblings, 0 replies; 14+ messages in thread
From: Max Carrara @ 2025-04-04 18:01 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
> Currently, the logical size of the uploaded chunks is used for size
> and upload rate calculation in case of sync jobs in push direction,
> leading to incorrect values of the actual transferred size and rate.
>
> Use the compressed chunk size instead, by returning the more verbose
> `UploadStats` on `upload_index_chunk_info` calls and use it's
> compressed size for the transferred `bytes` of `SyncStats` instead,
> being finally used to display the upload size and calculate the rate
> for the push sync job.
>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 3:
> - rebased onto current master
>
> pbs-client/src/backup_stats.rs | 20 ++++++++++----------
> pbs-client/src/backup_writer.rs | 4 ++--
> src/server/push.rs | 2 +-
> 3 files changed, 13 insertions(+), 13 deletions(-)
>
> diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
> index f0563a001..edf7ef3c4 100644
> --- a/pbs-client/src/backup_stats.rs
> +++ b/pbs-client/src/backup_stats.rs
> @@ -15,16 +15,16 @@ pub struct BackupStats {
> }
>
> /// Extended backup run statistics and archive checksum
> -pub(crate) struct UploadStats {
> - pub(crate) chunk_count: usize,
> - pub(crate) chunk_reused: usize,
> - pub(crate) chunk_injected: usize,
> - pub(crate) size: usize,
> - pub(crate) size_reused: usize,
> - pub(crate) size_injected: usize,
> - pub(crate) size_compressed: usize,
> - pub(crate) duration: Duration,
> - pub(crate) csum: [u8; 32],
> +pub struct UploadStats {
> + pub chunk_count: usize,
> + pub chunk_reused: usize,
> + pub chunk_injected: usize,
> + pub size: usize,
> + pub size_reused: usize,
> + pub size_injected: usize,
> + pub size_compressed: usize,
> + pub duration: Duration,
> + pub csum: [u8; 32],
> }
>
> impl UploadStats {
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index 325425069..d93b3e7e8 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -275,7 +275,7 @@ impl BackupWriter {
> archive_name: &BackupArchiveName,
> stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
> options: UploadOptions,
> - ) -> Result<BackupStats, Error> {
> + ) -> Result<UploadStats, Error> {
> let mut param = json!({ "archive-name": archive_name });
> let prefix = if let Some(size) = options.fixed_size {
> param["size"] = size.into();
> @@ -359,7 +359,7 @@ impl BackupWriter {
> .post(&format!("{prefix}_close"), Some(param))
> .await?;
>
> - Ok(upload_stats.to_backup_stats())
> + Ok(upload_stats)
> }
>
> pub async fn upload_stream(
> diff --git a/src/server/push.rs b/src/server/push.rs
> index 0db3dff30..f00a6b1e0 100644
> --- a/src/server/push.rs
> +++ b/src/server/push.rs
> @@ -1016,7 +1016,7 @@ async fn push_index(
>
> Ok(SyncStats {
> chunk_count: upload_stats.chunk_count as usize,
> - bytes: upload_stats.size as usize,
> + bytes: upload_stats.size_compressed as usize,
The `as usize` cast here is unnecessary, as `size_compressed` is already
`usize`. Same also goes for `chunk_count`. Can perhaps be fixed in a
follow-up, though. Tiny thing.
> elapsed: upload_stats.duration,
> removed: None,
> })
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
@ 2025-04-04 18:02 ` Max Carrara
2025-04-07 7:21 ` Fabian Grünbichler
0 siblings, 1 reply; 14+ messages in thread
From: Max Carrara @ 2025-04-04 18:02 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
> Currently, a sync job sequentially pulls the backup groups and the
> snapshots contained within them, therefore being limited in download
> speed by the http2 connection of the source reader instance in case
> of remote syncs. High latency networks suffer from limited download
> speed.
>
> Improve the throughput by allowing to pull up to a configured number
> of backup groups concurrently, by creating tasks connecting and
> pulling from the remote source in parallel.
>
> Make the error handling and accounting logic for each group pull
> reusable by moving it into its own helper function, returning the
> future.
>
> The store progress is placed behind an atomic reference counted mutex
> to allow for concurrent access of status updates.
Yeah, so... I've got some thoughts about this:
First of all, I think that that's *fine* here, as the
`Arc<Mutex<StoreProgress>>` probably isn't going to face that much lock
contention or something anyway. So to get that out of the way, IMO we
can keep that here as it is right now.
But in the future I do think that we should check the locations where we
have that kind of concurrent data access / modification, because I feel
the amount of mutexes is only going to continue growing. That's not a
bad thing per se, but it does come with a few risks / drawbacks (e.g.
higher risk for deadlocks).
Without going to deep into the whole "how to avoid deadlocks" discussion
and other things, here's an alternative I want to propose that could
perhaps be done in (or as part of a) different series, since it's a bit
out of scope for this one here. (Though, if you do wanna do it in this
one, I certainly won't complain!)
First, since `StoreProgress` only contains four `usize`s, it should be
fairly easy to convert the ones being modified into `AtomicUsize`s and
perhaps add helper methods to increase their respective values;
something like this:
#[derive(Debug, Default)]
/// Tracker for progress of operations iterating over `Datastore` contents.
pub struct StoreProgress {
/// Completed groups
pub done_groups: AtomicUsize,
/// Total groups
pub total_groups: u64,
/// Completed snapshots within current group
pub done_snapshots: AtomicUsize,
/// Total snapshots in current group
pub group_snapshots: u64,
}
// [...]
impl StoreProgress {
pub fn add_done_group(&self) {
let _ = self.done_groups.fetch_add(1, Ordering::Relaxed);
}
// [...]
}
(of course, what it all should look like in detail is up to bikeshedding :P)
Something like that would probably be nicer here, because:
- You won't need to wrap `StoreProgress` within an `Arc<Mutex<T>>`
anymore -- a shared reference is enough, since ...
- Operations on atomics take &self (that's the whole point of them ofc ;p )
This means that:
- Cloning an `Arc<T>` is not necessary anymore
--> should be approx. two atomic ops less times the amount of `Arc`s used
(on create/clone and drop for each `Arc`)
- Locking the `Mutex<T>` is also not necessary anymore, which means
--> should be two atomic ops less for each call to `.lock()`
(acquire and release)
In turn, this is replaced by a single atomic call with
`Ordering::Relaxed` (which is fine for counters [0]). So, something like
progress.lock().unwrap().done_groups += 1;
would just become
progress.add_done_group();
which is also quite neat.
Note however that we might have to split that struct into a "local" and
"shared" version or whatever in order to adapt it all to the current
code (a locally-used struct ofc doesn't need atomics).
Again, I think what you're doing here is perfectly fine; I just think
that we should have a look at all of those concurrent data accesses and
see whether we can slim some stuff down or perhaps have some kind of
statically enforced mutex ordering for deadlock prevention [1].
[0]: https://doc.rust-lang.org/nomicon/atomics.html#relaxed
[1]: https://www.youtube.com/watch?v=Ba7fajt4l1M
>
> Link to issue in bugtracker:
> https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Should be:
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
.. judging from our recent history at least. Can be adapted when
applying the patch, though.
>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 3:
> - rebased onto current master
>
> pbs-datastore/src/store_progress.rs | 2 +-
> src/server/pull.rs | 111 ++++++++++++++++++----------
> 2 files changed, 72 insertions(+), 41 deletions(-)
>
> diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
> index a32bb9a9d..8afa60ace 100644
> --- a/pbs-datastore/src/store_progress.rs
> +++ b/pbs-datastore/src/store_progress.rs
> @@ -1,4 +1,4 @@
> -#[derive(Debug, Default)]
> +#[derive(Clone, Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
> /// Completed groups
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 46c3d8dc5..a484957ce 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> use anyhow::{bail, format_err, Error};
> +use futures::stream::FuturesUnordered;
> +use futures::StreamExt;
> use proxmox_human_byte::HumanByte;
> use tracing::info;
>
> @@ -512,7 +514,7 @@ async fn pull_group(
> params: &PullParameters,
> source_namespace: &BackupNamespace,
> group: &BackupGroup,
> - progress: &mut StoreProgress,
> + store_progress: Arc<Mutex<StoreProgress>>,
> ) -> Result<SyncStats, Error> {
> let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
> @@ -601,7 +603,8 @@ async fn pull_group(
> // start with 65536 chunks (up to 256 GiB)
> let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
> - progress.group_snapshots = list.len() as u64;
> + let mut local_progress = store_progress.lock().unwrap().clone();
> + local_progress.group_snapshots = list.len() as u64;
>
> let mut sync_stats = SyncStats::default();
>
> @@ -618,8 +621,11 @@ async fn pull_group(
> let result =
> pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
>
> - progress.done_snapshots = pos as u64 + 1;
> - info!("percentage done: {progress}");
> + store_progress.lock().unwrap().done_snapshots += 1;
> + // Update done groups progress by other parallel running pulls
> + local_progress.done_groups = store_progress.lock().unwrap().done_groups;
> + local_progress.done_snapshots = pos as u64 + 1;
> + info!("Percentage done: {local_progress}");
>
> let stats = result?; // stop on error
> sync_stats.add(stats);
> @@ -863,6 +869,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
> Ok(sync_stats)
> }
>
> +async fn pull_group_do(
> + params: &PullParameters,
> + group: &BackupGroup,
> + namespace: &BackupNamespace,
> + target_namespace: &BackupNamespace,
> + progress: Arc<Mutex<StoreProgress>>,
> +) -> Result<SyncStats, ()> {
> + let (owner, _lock_guard) =
> + match params
> + .target
> + .store
> + .create_locked_backup_group(target_namespace, group, ¶ms.owner)
> + {
> + Ok(result) => result,
> + Err(err) => {
> + info!("sync group {group} failed - group lock failed: {err}");
> + info!("create_locked_backup_group failed");
Since a lot of the surrounding code already does it it's fine I guess,
but... why do we use `info!` everywhere here for errors? Is there any
reason in particular? 😅
> + return Err(());
> + }
> + };
> +
> + if params.owner != owner {
> + // only the owner is allowed to create additional snapshots
> + info!(
> + "sync group {group} failed - owner check failed ({} != {owner})",
> + params.owner,
> + );
> + return Err(());
> + }
> +
> + match pull_group(params, namespace, group, progress.clone()).await {
> + Ok(sync_stats) => {
> + progress.lock().unwrap().done_groups += 1;
> + Ok(sync_stats)
> + }
> + Err(err) => {
> + info!("sync group {group} failed - {err}");
> + Err(())
> + }
> + }
> +}
> +
> /// Pulls a namespace according to `params`.
> ///
> /// Pulling a namespace consists of the following steps:
> @@ -901,48 +949,29 @@ pub(crate) async fn pull_ns(
> new_groups.insert(group.clone());
> }
>
> - let mut progress = StoreProgress::new(list.len() as u64);
> + let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
> let mut sync_stats = SyncStats::default();
>
> let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
>
> - for (done, group) in list.into_iter().enumerate() {
> - progress.done_groups = done as u64;
> - progress.done_snapshots = 0;
> - progress.group_snapshots = 0;
> + let mut puller = FuturesUnordered::new();
> + let mut group_futures_iter = list
> + .iter()
> + .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
>
> - let (owner, _lock_guard) =
> - match params
> - .target
> - .store
> - .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
> - {
> - Ok(result) => result,
> - Err(err) => {
> - info!("sync group {} failed - group lock failed: {err}", &group);
> - errors = true;
> - // do not stop here, instead continue
> - info!("create_locked_backup_group failed");
> - continue;
> - }
> - };
> + for _ in 0..params.parallel_groups.unwrap_or(1) {
> + if let Some(future) = group_futures_iter.next() {
> + puller.push(future);
> + }
> + }
>
> - // permission check
> - if params.owner != owner {
> - // only the owner is allowed to create additional snapshots
> - info!(
> - "sync group {} failed - owner check failed ({} != {owner})",
> - &group, params.owner
> - );
> - errors = true; // do not stop here, instead continue
> - } else {
> - match pull_group(params, namespace, &group, &mut progress).await {
> - Ok(stats) => sync_stats.add(stats),
> - Err(err) => {
> - info!("sync group {} failed - {err}", &group);
> - errors = true; // do not stop here, instead continue
> - }
> - }
> + while let Some(result) = puller.next().await {
> + match result {
> + Ok(stats) => sync_stats.add(stats),
> + Err(()) => errors |= true,
> + };
> + if let Some(future) = group_futures_iter.next() {
> + puller.push(future);
> }
> }
>
> @@ -998,5 +1027,7 @@ pub(crate) async fn pull_ns(
> };
> }
>
> + let progress = progress.lock().unwrap().clone();
> +
> Ok((progress, sync_stats, errors))
> }
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
@ 2025-04-05 9:31 ` Christian Ebner
2025-04-09 10:22 ` Max Carrara
0 siblings, 1 reply; 14+ messages in thread
From: Christian Ebner @ 2025-04-05 9:31 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Max Carrara
Cc: Thomas Lamprecht
On 4/4/25 20:01, Max Carrara wrote:
> On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
>> Syncing contents from/to a remote source via a sync job suffers from
>> low throughput on high latency networks because of limitations by the
>> HTTP/2 connection, as described in [0]. To improve, syncing multiple
>> groups in parallel by establishing multiple reader instances has been
>> suggested.
>>
>> This patch series implements the functionality by adding the sync job
>> configuration property `parallel-groups`, allowing to define the
>> number of concurrent groups pull/push futures to be instantiated and
>> executed for each job.
>> The property is currently not exposed on the UI, as intended to be
>> set in the config directly for now.
>>
>> Examplary configuration:
>> ```
>> sync: s-8764c440-3a6c
>> ns
>> owner root@pam
>> remote local
>> remote-ns
>> remote-store push-target-store
>> remove-vanished false
>> store datastore
>> sync-direction push
>> parallel-groups 4
>> ```
>>
>> Since log messages are now also written concurrently, prefix logs
>> related to groups, snapshots and archives with their respective
>> context prefix and add context to error messages.
>>
>> Further, improve logging especially for sync jobs in push direction,
>> which only displayed limited information so far.
>>
>> [0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182
>
> So, I've given the code a good look -- unfortunately it's too late to do
> any additional testing, but I wanted to shoot this out regardless in the
> meantime.
>
> Code Review
> ===========
>
> As always, the code quality is pristine -- I like that you're factoring
> things out into little helper functions where applicable instead of
> letting the existing methods grow. Very nice. Also applies cleanly
> and is formatted correctly, naturally. Really can't complain, the
> changes are straightforward and easy to follow.
>
> There's only a couple little things I spotted; see my comments inline.
>
> Regarding that large comment about mutexes and atomics: That's something
> I just wanted to mention, so just to make it clear, you don't need to
> apply my suggestion :P It's probably something we should have a look at
> tree-wide for other data structures, too.
>
> Splendid work as always, anyhow!
>
> For now, until I get to test this, consider:
>
> Reviewed-by: Max Carrara <m.carrara@proxmox.com>
Thanks for your efforts on this one, appreciated!
There are however 2 things which make me hesitate with bringing this
patch series further in its current approach:
- Thomas raised valid concerns about the feasibility of adding such
parallelism parameters just for the sake of a quick fix [0].
- There was a user report about sync jobs being slow over a VPN
connection, which resulted in networking adjustments which did
significantly increase the throughput on his side [1]. He documented
this in the bugtracker issue upon my request [2].
So I would like to rather investigate if congestion control settings can
help increase performance for the sync jobs rather than adding the
parallel group sync for now.
What do you think? (CC'ing also Thomas asking for his opinion).
[0]
https://lore.proxmox.com/pbs-devel/e5a2dcac-630e-4797-bbbf-f38bc260c2ca@proxmox.com/
[1] https://forum.proxmox.com/threads/164450/post-761198
[2] https://bugzilla.proxmox.com/show_bug.cgi?id=4182#c12
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently
2025-04-04 18:02 ` Max Carrara
@ 2025-04-07 7:21 ` Fabian Grünbichler
0 siblings, 0 replies; 14+ messages in thread
From: Fabian Grünbichler @ 2025-04-07 7:21 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On April 4, 2025 8:02 pm, Max Carrara wrote:
> On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
>> Currently, a sync job sequentially pulls the backup groups and the
>> snapshots contained within them, therefore being limited in download
>> speed by the http2 connection of the source reader instance in case
>> of remote syncs. High latency networks suffer from limited download
>> speed.
>>
>> Improve the throughput by allowing to pull up to a configured number
>> of backup groups concurrently, by creating tasks connecting and
>> pulling from the remote source in parallel.
>>
>> Make the error handling and accounting logic for each group pull
>> reusable by moving it into its own helper function, returning the
>> future.
>>
>> The store progress is placed behind an atomic reference counted mutex
>> to allow for concurrent access of status updates.
>
> Yeah, so... I've got some thoughts about this:
>
> First of all, I think that that's *fine* here, as the
> `Arc<Mutex<StoreProgress>>` probably isn't going to face that much lock
> contention or something anyway. So to get that out of the way, IMO we
> can keep that here as it is right now.
>
> But in the future I do think that we should check the locations where we
> have that kind of concurrent data access / modification, because I feel
> the amount of mutexes is only going to continue growing. That's not a
> bad thing per se, but it does come with a few risks / drawbacks (e.g.
> higher risk for deadlocks).
>
> Without going to deep into the whole "how to avoid deadlocks" discussion
> and other things, here's an alternative I want to propose that could
> perhaps be done in (or as part of a) different series, since it's a bit
> out of scope for this one here. (Though, if you do wanna do it in this
> one, I certainly won't complain!)
>
> First, since `StoreProgress` only contains four `usize`s, it should be
> fairly easy to convert the ones being modified into `AtomicUsize`s and
> perhaps add helper methods to increase their respective values;
> something like this:
>
> #[derive(Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
> /// Completed groups
> pub done_groups: AtomicUsize,
> /// Total groups
> pub total_groups: u64,
> /// Completed snapshots within current group
> pub done_snapshots: AtomicUsize,
> /// Total snapshots in current group
> pub group_snapshots: u64,
> }
>
> // [...]
>
> impl StoreProgress {
> pub fn add_done_group(&self) {
> let _ = self.done_groups.fetch_add(1, Ordering::Relaxed);
> }
>
> // [...]
> }
>
> (of course, what it all should look like in detail is up to bikeshedding :P)
>
> Something like that would probably be nicer here, because:
>
> - You won't need to wrap `StoreProgress` within an `Arc<Mutex<T>>`
> anymore -- a shared reference is enough, since ...
> - Operations on atomics take &self (that's the whole point of them ofc ;p )
>
> This means that:
> - Cloning an `Arc<T>` is not necessary anymore
> --> should be approx. two atomic ops less times the amount of `Arc`s used
> (on create/clone and drop for each `Arc`)
> - Locking the `Mutex<T>` is also not necessary anymore, which means
> --> should be two atomic ops less for each call to `.lock()`
> (acquire and release)
>
> In turn, this is replaced by a single atomic call with
> `Ordering::Relaxed` (which is fine for counters [0]). So, something like
>
> progress.lock().unwrap().done_groups += 1;
>
> would just become
>
> progress.add_done_group();
>
> which is also quite neat.
>
> Note however that we might have to split that struct into a "local" and
> "shared" version or whatever in order to adapt it all to the current
> code (a locally-used struct ofc doesn't need atomics).
>
> Again, I think what you're doing here is perfectly fine; I just think
> that we should have a look at all of those concurrent data accesses and
> see whether we can slim some stuff down or perhaps have some kind of
> statically enforced mutex ordering for deadlock prevention [1].
>
> [0]: https://doc.rust-lang.org/nomicon/atomics.html#relaxed
> [1]: https://www.youtube.com/watch?v=Ba7fajt4l1M
we do have a similar construct in the client (although still with Arc<>,
maybe we could eliminate it there? ;)):
https://git.proxmox.com/?p=proxmox-backup.git;a=blob;f=pbs-client/src/backup_stats.rs;h=f0563a0011b1117e32027a8a88f9d6f65db591f0;hb=HEAD#l45
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs
2025-04-05 9:31 ` Christian Ebner
@ 2025-04-09 10:22 ` Max Carrara
0 siblings, 0 replies; 14+ messages in thread
From: Max Carrara @ 2025-04-09 10:22 UTC (permalink / raw)
To: Christian Ebner, Proxmox Backup Server development discussion
Cc: Thomas Lamprecht
On Sat Apr 5, 2025 at 11:31 AM CEST, Christian Ebner wrote:
> On 4/4/25 20:01, Max Carrara wrote:
> > On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
> >> Syncing contents from/to a remote source via a sync job suffers from
> >> low throughput on high latency networks because of limitations by the
> >> HTTP/2 connection, as described in [0]. To improve, syncing multiple
> >> groups in parallel by establishing multiple reader instances has been
> >> suggested.
> >>
> >> This patch series implements the functionality by adding the sync job
> >> configuration property `parallel-groups`, allowing to define the
> >> number of concurrent groups pull/push futures to be instantiated and
> >> executed for each job.
> >> The property is currently not exposed on the UI, as intended to be
> >> set in the config directly for now.
> >>
> >> Examplary configuration:
> >> ```
> >> sync: s-8764c440-3a6c
> >> ns
> >> owner root@pam
> >> remote local
> >> remote-ns
> >> remote-store push-target-store
> >> remove-vanished false
> >> store datastore
> >> sync-direction push
> >> parallel-groups 4
> >> ```
> >>
> >> Since log messages are now also written concurrently, prefix logs
> >> related to groups, snapshots and archives with their respective
> >> context prefix and add context to error messages.
> >>
> >> Further, improve logging especially for sync jobs in push direction,
> >> which only displayed limited information so far.
> >>
> >> [0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182
> >
> > So, I've given the code a good look -- unfortunately it's too late to do
> > any additional testing, but I wanted to shoot this out regardless in the
> > meantime.
> >
> > Code Review
> > ===========
> >
> > As always, the code quality is pristine -- I like that you're factoring
> > things out into little helper functions where applicable instead of
> > letting the existing methods grow. Very nice. Also applies cleanly
> > and is formatted correctly, naturally. Really can't complain, the
> > changes are straightforward and easy to follow.
> >
> > There's only a couple little things I spotted; see my comments inline.
> >
> > Regarding that large comment about mutexes and atomics: That's something
> > I just wanted to mention, so just to make it clear, you don't need to
> > apply my suggestion :P It's probably something we should have a look at
> > tree-wide for other data structures, too.
> >
> > Splendid work as always, anyhow!
> >
> > For now, until I get to test this, consider:
> >
> > Reviewed-by: Max Carrara <m.carrara@proxmox.com>
>
> Thanks for your efforts on this one, appreciated!
>
> There are however 2 things which make me hesitate with bringing this
> patch series further in its current approach:
>
> - Thomas raised valid concerns about the feasibility of adding such
> parallelism parameters just for the sake of a quick fix [0].
>
> - There was a user report about sync jobs being slow over a VPN
> connection, which resulted in networking adjustments which did
> significantly increase the throughput on his side [1]. He documented
> this in the bugtracker issue upon my request [2].
>
> So I would like to rather investigate if congestion control settings can
> help increase performance for the sync jobs rather than adding the
> parallel group sync for now.
>
> What do you think? (CC'ing also Thomas asking for his opinion).
>
> [0]
> https://lore.proxmox.com/pbs-devel/e5a2dcac-630e-4797-bbbf-f38bc260c2ca@proxmox.com/
> [1] https://forum.proxmox.com/threads/164450/post-761198
> [2] https://bugzilla.proxmox.com/show_bug.cgi?id=4182#c12
Huh! That's very interesting! Yeah, seems perfectly reasonable to focus
on congestion control settings instead, then.
If you recall, Gabriel and I had cooked up a scheduler prototype a
couple months ago. We had actually managed to get parallel backup jobs
going e.g., with the max amount of jobs being adjustable. My plan was to
extend this to pretty much any type of job, but there were a *lot* of
open questions and design decisions to be made in general there--so we
didn't continue iterating on it, as there were more important things to
address.
So, should be unbury that prototype at some point in the future, perhaps
we can incorporate some of the things here in this series. I can't give
you an estimate on when that will happen though, as Gabriel is working
on SDN and I'm working on Storage atm.
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 14+ messages in thread
end of thread, other threads:[~2025-04-09 10:23 UTC | newest]
Thread overview: 14+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2025-04-04 18:01 ` Max Carrara
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2025-04-04 18:02 ` Max Carrara
2025-04-07 7:21 ` Fabian Grünbichler
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
2025-04-05 9:31 ` Christian Ebner
2025-04-09 10:22 ` Max Carrara
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal