public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently
Date: Mon,  9 Mar 2026 17:20:48 +0100	[thread overview]
Message-ID: <20260309162050.1047341-11-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260309162050.1047341-1-c.ebner@proxmox.com>

Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.

The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/push.rs | 97 ++++++++++++++++++++++++++++++++++------------
 1 file changed, 72 insertions(+), 25 deletions(-)

diff --git a/src/server/push.rs b/src/server/push.rs
index 5828f2ed1..e7d56cc2a 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -34,6 +34,7 @@ use super::sync::{
     SyncSource, SyncStats,
 };
 use crate::api2::config::remote;
+use crate::server::sync::{GroupWorkerSet, SharedGroupProgress};
 
 /// Target for backups to be pushed to
 pub(crate) struct PushTarget {
@@ -550,41 +551,58 @@ pub(crate) async fn push_namespace(
 
     let mut errors = false;
     // Remember synced groups, remove others when the remove vanished flag is set
-    let mut synced_groups = HashSet::new();
+    let synced_groups = Arc::new(Mutex::new(HashSet::new()));
     let mut progress = StoreProgress::new(list.len() as u64);
     let mut stats = SyncStats::default();
 
     let (owned_target_groups, not_owned_target_groups) =
         fetch_target_groups(&params, &target_namespace).await?;
+    let not_owned_target_groups = Arc::new(not_owned_target_groups);
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1));
+    let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
 
-        if not_owned_target_groups.contains(&group) {
-            warn!(
-                "Group '{group}' not owned by remote user '{}' on target, skipping upload",
-                params.target.remote_user(),
-            );
-            continue;
-        }
-        synced_groups.insert(group.clone());
-
-        match push_group(Arc::clone(&params), namespace, &group, &mut progress).await {
-            Ok(sync_stats) => stats.add(sync_stats),
-            Err(err) => {
-                warn!("Encountered errors: {err:#}");
-                warn!("Failed to push group {group} to remote!");
-                errors = true;
+    let mut process_results = |results| {
+        for result in results {
+            match result {
+                Ok(sync_stats) => {
+                    stats.add(sync_stats);
+                    progress.done_groups = shared_group_progress.increment_done();
+                }
+                Err(()) => errors = true,
             }
         }
+    };
+
+    for group in list.into_iter() {
+        let namespace = namespace.clone();
+        let params = Arc::clone(&params);
+        let not_owned_target_groups = Arc::clone(&not_owned_target_groups);
+        let synced_groups = Arc::clone(&synced_groups);
+        let group_progress_cloned = Arc::clone(&shared_group_progress);
+        let results = group_workers
+            .spawn_task(async move {
+                push_group_do(
+                    params,
+                    &namespace,
+                    &group,
+                    group_progress_cloned,
+                    synced_groups,
+                    not_owned_target_groups,
+                )
+                .await
+            })
+            .await;
+        process_results(results);
     }
 
+    let results = group_workers.join_active().await;
+    process_results(results);
+
     if params.remove_vanished {
         // only ever allow to prune owned groups on target
         for target_group in owned_target_groups {
-            if synced_groups.contains(&target_group) {
+            if synced_groups.lock().unwrap().contains(&target_group) {
                 continue;
             }
             if !target_group.apply_filters(&params.group_filter) {
@@ -663,6 +681,32 @@ async fn forget_target_snapshot(
     Ok(())
 }
 
+async fn push_group_do(
+    params: Arc<PushParameters>,
+    namespace: &BackupNamespace,
+    group: &BackupGroup,
+    shared_group_progress: Arc<SharedGroupProgress>,
+    synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+    not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+    if not_owned_target_groups.contains(group) {
+        warn!(
+            "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+            params.target.remote_user(),
+        );
+        shared_group_progress.increment_done();
+        return Ok(SyncStats::default());
+    }
+
+    synced_groups.lock().unwrap().insert(group.clone());
+    push_group(params, namespace, group, Arc::clone(&shared_group_progress))
+        .await
+        .map_err(|err| {
+            warn!("Group {group}: Encountered errors: {err:#}");
+            warn!("Failed to push group {group} to remote!");
+        })
+}
+
 /// Push group including all snaphshots to target
 ///
 /// Iterate over all snapshots in the group and push them to the target.
@@ -676,7 +720,7 @@ pub(crate) async fn push_group(
     params: Arc<PushParameters>,
     namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    shared_group_progress: Arc<SharedGroupProgress>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -744,7 +788,8 @@ pub(crate) async fn push_group(
         transfer_last_skip_info.reset();
     }
 
-    progress.group_snapshots = snapshots.len() as u64;
+    let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+    local_progress.group_snapshots = snapshots.len() as u64;
 
     let mut stats = SyncStats::default();
     let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -758,8 +803,10 @@ pub(crate) async fn push_group(
         .await;
         fetch_previous_manifest = true;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("Percentage done: {progress}");
+        // Update done groups progress by other parallel running pushes
+        local_progress.done_groups = shared_group_progress.load_done();
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("Percentage done: group {group}: {local_progress}");
 
         // stop on error
         let sync_stats = result?;
-- 
2.47.3





  parent reply	other threads:[~2026-03-09 16:22 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` Christian Ebner [this message]
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260309162050.1047341-11-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal