all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel
Date: Fri, 17 Apr 2026 11:26:16 +0200	[thread overview]
Message-ID: <20260417092621.455374-11-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>

Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them. It is therefore limited in download
speed by the single HTTP/2 connection of the source reader instance
in case of remote syncs. For high latency networks, this suffer from
limited download speed due to head of line blocking.

Improve the throughput by allowing to pull up to a configured number
of backup groups in parallel, by creating a bounded join set, allowing
to which concurrently pulls from the remote source up to the
configured number of tokio tasks. Since these are dedicated tasks,
they can run independent and in parallel on the tokio runtime.

Store progress output is now prefixed by the group as it depends on
the group being pulled since the snapshot count differs. To update
the output on a per group level, the shared group progress count is
passed as atomic counter, the store progress accounted globally as
well as per-group.

Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- uses BoundedJoinSet implementation, refactored accordingly

 src/server/pull.rs | 70 ++++++++++++++++++++++++++++++++--------------
 src/server/sync.rs | 33 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 21 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5beca6b8d..611441d2a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -26,6 +26,7 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress};
+use pbs_tools::bounded_join_set::BoundedJoinSet;
 use pbs_tools::sha::sha256;
 
 use super::sync::{
@@ -34,6 +35,7 @@ use super::sync::{
     SkipReason, SyncSource, SyncSourceReader, SyncStats,
 };
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::server::sync::SharedGroupProgress;
 use crate::tools::parallel_handler::ParallelHandler;
 
 pub(crate) struct PullTarget {
@@ -619,7 +621,7 @@ async fn pull_group(
     params: Arc<PullParameters>,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    shared_group_progress: Arc<SharedGroupProgress>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -782,7 +784,8 @@ async fn pull_group(
         }
     }
 
-    progress.group_snapshots = list.len() as u64;
+    let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+    local_progress.group_snapshots = list.len() as u64;
 
     let mut sync_stats = SyncStats::default();
 
@@ -805,8 +808,10 @@ async fn pull_group(
         )
         .await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        // Update done groups progress by other parallel running pulls
+        local_progress.done_groups = shared_group_progress.load_done();
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("percentage done: group {group}: {local_progress}");
 
         let stats = result?; // stop on error
         sync_stats.add(stats);
@@ -1058,7 +1063,7 @@ async fn lock_and_pull_group(
     group: &BackupGroup,
     namespace: &BackupNamespace,
     target_namespace: &BackupNamespace,
-    progress: &mut StoreProgress,
+    shared_group_progress: Arc<SharedGroupProgress>,
 ) -> Result<SyncStats, Error> {
     let (owner, _lock_guard) =
         match params
@@ -1083,7 +1088,7 @@ async fn lock_and_pull_group(
         return Err(format_err!("owner check failed"));
     }
 
-    match pull_group(params, namespace, group, progress).await {
+    match pull_group(params, namespace, group, shared_group_progress).await {
         Ok(stats) => Ok(stats),
         Err(err) => {
             info!("sync group {group} failed - {err:#}");
@@ -1135,25 +1140,48 @@ async fn pull_ns(
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+    let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1));
 
-        match lock_and_pull_group(
-            Arc::clone(&params),
-            &group,
-            namespace,
-            &target_ns,
-            &mut progress,
-        )
-        .await
-        {
-            Ok(stats) => sync_stats.add(stats),
-            Err(_err) => errors = true,
+    let mut process_results = |results| {
+        for result in results {
+            match result {
+                Ok(stats) => {
+                    sync_stats.add(stats);
+                    progress.done_groups = shared_group_progress.increment_done();
+                }
+                Err(_err) => errors = true,
+            }
         }
+    };
+
+    for group in list.into_iter() {
+        let namespace = namespace.clone();
+        let target_ns = target_ns.clone();
+        let params = Arc::clone(&params);
+        let group_progress_cloned = Arc::clone(&shared_group_progress);
+        let results = group_workers
+            .spawn_task(async move {
+                lock_and_pull_group(
+                    Arc::clone(&params),
+                    &group,
+                    &namespace,
+                    &target_ns,
+                    group_progress_cloned,
+                )
+                .await
+            })
+            .await
+            .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+        process_results(results);
     }
 
+    let results = group_workers
+        .join_active()
+        .await
+        .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?;
+    process_results(results);
+
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
             for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 9e6aeb9b0..e88418442 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -4,6 +4,7 @@ use std::collections::HashMap;
 use std::io::{Seek, Write};
 use std::ops::Deref;
 use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
@@ -12,6 +13,7 @@ use futures::{future::FutureExt, select};
 use hyper::http::StatusCode;
 use pbs_config::BackupLockGuard;
 use serde_json::json;
+use tokio::task::JoinSet;
 use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
@@ -792,3 +794,34 @@ pub(super) fn exclude_not_verified_or_encrypted(
 
     false
 }
+
+/// Track group progress during parallel push/pull in sync jobs
+pub(crate) struct SharedGroupProgress {
+    done: AtomicUsize,
+    total: usize,
+}
+
+impl SharedGroupProgress {
+    /// Create a new instance to track group progress with expected total number of groups
+    pub(crate) fn with_total_groups(total: usize) -> Self {
+        Self {
+            done: AtomicUsize::new(0),
+            total,
+        }
+    }
+
+    /// Return current counter value for done groups
+    pub(crate) fn load_done(&self) -> u64 {
+        self.done.load(Ordering::Acquire) as u64
+    }
+
+    /// Increment counter for done groups and return new value
+    pub(crate) fn increment_done(&self) -> u64 {
+        self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1
+    }
+
+    /// Return the number of total backup groups
+    pub(crate) fn total_groups(&self) -> u64 {
+        self.total as u64
+    }
+}
-- 
2.47.3





  parent reply	other threads:[~2026-04-17  9:27 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` Christian Ebner [this message]
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260417092621.455374-11-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal