From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently
Date: Mon, 9 Mar 2026 17:20:48 +0100 [thread overview]
Message-ID: <20260309162050.1047341-11-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260309162050.1047341-1-c.ebner@proxmox.com>
Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 97 ++++++++++++++++++++++++++++++++++------------
1 file changed, 72 insertions(+), 25 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 5828f2ed1..e7d56cc2a 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -34,6 +34,7 @@ use super::sync::{
SyncSource, SyncStats,
};
use crate::api2::config::remote;
+use crate::server::sync::{GroupWorkerSet, SharedGroupProgress};
/// Target for backups to be pushed to
pub(crate) struct PushTarget {
@@ -550,41 +551,58 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
let mut progress = StoreProgress::new(list.len() as u64);
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(¶ms, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1));
+ let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
- }
- synced_groups.insert(group.clone());
-
- match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await {
- Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
+ let mut process_results = |results| {
+ for result in results {
+ match result {
+ Ok(sync_stats) => {
+ stats.add(sync_stats);
+ progress.done_groups = shared_group_progress.increment_done();
+ }
+ Err(()) => errors = true,
}
}
+ };
+
+ for group in list.into_iter() {
+ let namespace = namespace.clone();
+ let params = Arc::clone(¶ms);
+ let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
+ let synced_groups = Arc::clone(&synced_groups);
+ let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let results = group_workers
+ .spawn_task(async move {
+ push_group_do(
+ params,
+ &namespace,
+ &group,
+ group_progress_cloned,
+ synced_groups,
+ not_owned_target_groups,
+ )
+ .await
+ })
+ .await;
+ process_results(results);
}
+ let results = group_workers.join_active().await;
+ process_results(results);
+
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -663,6 +681,32 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: Arc<PushParameters>,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ shared_group_progress: Arc<SharedGroupProgress>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ shared_group_progress.increment_done();
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ push_group(params, namespace, group, Arc::clone(&shared_group_progress))
+ .await
+ .map_err(|err| {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ })
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -676,7 +720,7 @@ pub(crate) async fn push_group(
params: Arc<PushParameters>,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -744,7 +788,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -758,8 +803,10 @@ pub(crate) async fn push_group(
.await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = shared_group_progress.load_done();
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
--
2.47.3
next prev parent reply other threads:[~2026-03-09 16:22 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` Christian Ebner [this message]
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260309162050.1047341-11-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox