all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently
Date: Fri, 17 Apr 2026 11:26:19 +0200	[thread overview]
Message-ID: <20260417092621.455374-14-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>

Improve the throughput over high latency connections for sync jobs in
push direction by allowing to push up to a configured number of
backup groups concurrently. Just like for pull sync jobs, use an
bounded join set to run up to the configured number of group worker
tokio tasks in parallel, each connecting and pushing a group to
the reomte target.

The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- uses BoundedJoinSet implementation, refactored accordingly

 src/server/push.rs | 102 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 77 insertions(+), 25 deletions(-)

diff --git a/src/server/push.rs b/src/server/push.rs
index 14395fe61..9b7fb4522 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -27,6 +27,7 @@ use pbs_datastore::fixed_index::FixedIndexReader;
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{DataStore, StoreProgress};
+use pbs_tools::bounded_join_set::BoundedJoinSet;
 
 use super::sync::{
     check_namespace_depth_limit, exclude_not_verified_or_encrypted,
@@ -34,6 +35,7 @@ use super::sync::{
     SyncSource, SyncStats,
 };
 use crate::api2::config::remote;
+use crate::server::sync::SharedGroupProgress;
 
 /// Target for backups to be pushed to
 pub(crate) struct PushTarget {
@@ -551,41 +553,62 @@ pub(crate) async fn push_namespace(
 
     let mut errors = false;
     // Remember synced groups, remove others when the remove vanished flag is set
-    let mut synced_groups = HashSet::new();
+    let synced_groups = Arc::new(Mutex::new(HashSet::new()));
     let mut progress = StoreProgress::new(list.len() as u64);
     let mut stats = SyncStats::default();
 
     let (owned_target_groups, not_owned_target_groups) =
         fetch_target_groups(&params, &target_namespace).await?;
+    let not_owned_target_groups = Arc::new(not_owned_target_groups);
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
+    let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
 
-        if not_owned_target_groups.contains(&group) {
-            warn!(
-                "Group '{group}' not owned by remote user '{}' on target, skipping upload",
-                params.target.remote_user(),
-            );
-            continue;
-        }
-        synced_groups.insert(group.clone());
-
-        match push_group(Arc::clone(&params), namespace, &group, &mut progress).await {
-            Ok(sync_stats) => stats.add(sync_stats),
-            Err(err) => {
-                warn!("Encountered errors: {err:#}");
-                warn!("Failed to push group {group} to remote!");
-                errors = true;
+    let mut process_results = |results| {
+        for result in results {
+            match result {
+                Ok(sync_stats) => {
+                    stats.add(sync_stats);
+                    progress.done_groups = shared_group_progress.increment_done();
+                }
+                Err(()) => errors = true,
             }
         }
+    };
+
+    for group in list.into_iter() {
+        let namespace = namespace.clone();
+        let params = Arc::clone(&params);
+        let not_owned_target_groups = Arc::clone(&not_owned_target_groups);
+        let synced_groups = Arc::clone(&synced_groups);
+        let group_progress_cloned = Arc::clone(&shared_group_progress);
+        let results = group_workers
+            .spawn_task(async move {
+                push_group_do(
+                    params,
+                    &namespace,
+                    &group,
+                    group_progress_cloned,
+                    synced_groups,
+                    not_owned_target_groups,
+                )
+                .await
+            })
+            .await
+            .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+        process_results(results);
     }
 
+    let results = group_workers
+        .join_active()
+        .await
+        .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+    process_results(results);
+
     if params.remove_vanished {
         // only ever allow to prune owned groups on target
         for target_group in owned_target_groups {
-            if synced_groups.contains(&target_group) {
+            if synced_groups.lock().unwrap().contains(&target_group) {
                 continue;
             }
             if !target_group.apply_filters(&params.group_filter) {
@@ -664,6 +687,32 @@ async fn forget_target_snapshot(
     Ok(())
 }
 
+async fn push_group_do(
+    params: Arc<PushParameters>,
+    namespace: &BackupNamespace,
+    group: &BackupGroup,
+    shared_group_progress: Arc<SharedGroupProgress>,
+    synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+    not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+    if not_owned_target_groups.contains(group) {
+        warn!(
+            "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+            params.target.remote_user(),
+        );
+        shared_group_progress.increment_done();
+        return Ok(SyncStats::default());
+    }
+
+    synced_groups.lock().unwrap().insert(group.clone());
+    push_group(params, namespace, group, Arc::clone(&shared_group_progress))
+        .await
+        .map_err(|err| {
+            warn!("Group {group}: Encountered errors: {err:#}");
+            warn!("Failed to push group {group} to remote!");
+        })
+}
+
 /// Push group including all snaphshots to target
 ///
 /// Iterate over all snapshots in the group and push them to the target.
@@ -677,7 +726,7 @@ pub(crate) async fn push_group(
     params: Arc<PushParameters>,
     namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    shared_group_progress: Arc<SharedGroupProgress>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -745,7 +794,8 @@ pub(crate) async fn push_group(
         transfer_last_skip_info.reset();
     }
 
-    progress.group_snapshots = snapshots.len() as u64;
+    let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+    local_progress.group_snapshots = snapshots.len() as u64;
 
     let mut stats = SyncStats::default();
     let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -759,8 +809,10 @@ pub(crate) async fn push_group(
         .await;
         fetch_previous_manifest = true;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("Percentage done: {progress}");
+        // Update done groups progress by other parallel running pushes
+        local_progress.done_groups = shared_group_progress.load_done();
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("Percentage done: group {group}: {local_progress}");
 
         // stop on error
         let sync_stats = result?;
-- 
2.47.3





  parent reply	other threads:[~2026-04-17  9:27 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` Christian Ebner [this message]
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260417092621.455374-14-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal