From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id DC0371FF13E for ; Fri, 17 Apr 2026 11:27:37 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B8DCF1C2A9; Fri, 17 Apr 2026 11:27:37 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Date: Fri, 17 Apr 2026 11:26:16 +0200 Message-ID: <20260417092621.455374-11-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com> References: <20260417092621.455374-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776417914663 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.070 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: L7BTG36VMNGYGA6QJ22VAO4ECI7NHLXI X-Message-ID-Hash: L7BTG36VMNGYGA6QJ22VAO4ECI7NHLXI X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Currently, a sync job sequentially pulls the backup groups and the snapshots contained within them. It is therefore limited in download speed by the single HTTP/2 connection of the source reader instance in case of remote syncs. For high latency networks, this suffer from limited download speed due to head of line blocking. Improve the throughput by allowing to pull up to a configured number of backup groups in parallel, by creating a bounded join set, allowing to which concurrently pulls from the remote source up to the configured number of tokio tasks. Since these are dedicated tasks, they can run independent and in parallel on the tokio runtime. Store progress output is now prefixed by the group as it depends on the group being pulled since the snapshot count differs. To update the output on a per group level, the shared group progress count is passed as atomic counter, the store progress accounted globally as well as per-group. Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182 Signed-off-by: Christian Ebner --- changes since version 5: - uses BoundedJoinSet implementation, refactored accordingly src/server/pull.rs | 70 ++++++++++++++++++++++++++++++++-------------- src/server/sync.rs | 33 ++++++++++++++++++++++ 2 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 5beca6b8d..611441d2a 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -26,6 +26,7 @@ use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{check_backup_owner, DataStore, DatastoreBackend, StoreProgress}; +use pbs_tools::bounded_join_set::BoundedJoinSet; use pbs_tools::sha::sha256; use super::sync::{ @@ -34,6 +35,7 @@ use super::sync::{ SkipReason, SyncSource, SyncSourceReader, SyncStats, }; use crate::backup::{check_ns_modification_privs, check_ns_privs}; +use crate::server::sync::SharedGroupProgress; use crate::tools::parallel_handler::ParallelHandler; pub(crate) struct PullTarget { @@ -619,7 +621,7 @@ async fn pull_group( params: Arc, source_namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -782,7 +784,8 @@ async fn pull_group( } } - progress.group_snapshots = list.len() as u64; + let mut local_progress = StoreProgress::new(shared_group_progress.total_groups()); + local_progress.group_snapshots = list.len() as u64; let mut sync_stats = SyncStats::default(); @@ -805,8 +808,10 @@ async fn pull_group( ) .await; - progress.done_snapshots = pos as u64 + 1; - info!("percentage done: {progress}"); + // Update done groups progress by other parallel running pulls + local_progress.done_groups = shared_group_progress.load_done(); + local_progress.done_snapshots = pos as u64 + 1; + info!("percentage done: group {group}: {local_progress}"); let stats = result?; // stop on error sync_stats.add(stats); @@ -1058,7 +1063,7 @@ async fn lock_and_pull_group( group: &BackupGroup, namespace: &BackupNamespace, target_namespace: &BackupNamespace, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let (owner, _lock_guard) = match params @@ -1083,7 +1088,7 @@ async fn lock_and_pull_group( return Err(format_err!("owner check failed")); } - match pull_group(params, namespace, group, progress).await { + match pull_group(params, namespace, group, shared_group_progress).await { Ok(stats) => Ok(stats), Err(err) => { info!("sync group {group} failed - {err:#}"); @@ -1135,25 +1140,48 @@ async fn pull_ns( let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; - for (done, group) in list.into_iter().enumerate() { - progress.done_groups = done as u64; - progress.done_snapshots = 0; - progress.group_snapshots = 0; + let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len())); + let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1)); - match lock_and_pull_group( - Arc::clone(¶ms), - &group, - namespace, - &target_ns, - &mut progress, - ) - .await - { - Ok(stats) => sync_stats.add(stats), - Err(_err) => errors = true, + let mut process_results = |results| { + for result in results { + match result { + Ok(stats) => { + sync_stats.add(stats); + progress.done_groups = shared_group_progress.increment_done(); + } + Err(_err) => errors = true, + } } + }; + + for group in list.into_iter() { + let namespace = namespace.clone(); + let target_ns = target_ns.clone(); + let params = Arc::clone(¶ms); + let group_progress_cloned = Arc::clone(&shared_group_progress); + let results = group_workers + .spawn_task(async move { + lock_and_pull_group( + Arc::clone(¶ms), + &group, + &namespace, + &target_ns, + group_progress_cloned, + ) + .await + }) + .await + .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?; + process_results(results); } + let results = group_workers + .join_active() + .await + .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?; + process_results(results); + if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { diff --git a/src/server/sync.rs b/src/server/sync.rs index 9e6aeb9b0..e88418442 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::io::{Seek, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -12,6 +13,7 @@ use futures::{future::FutureExt, select}; use hyper::http::StatusCode; use pbs_config::BackupLockGuard; use serde_json::json; +use tokio::task::JoinSet; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; @@ -792,3 +794,34 @@ pub(super) fn exclude_not_verified_or_encrypted( false } + +/// Track group progress during parallel push/pull in sync jobs +pub(crate) struct SharedGroupProgress { + done: AtomicUsize, + total: usize, +} + +impl SharedGroupProgress { + /// Create a new instance to track group progress with expected total number of groups + pub(crate) fn with_total_groups(total: usize) -> Self { + Self { + done: AtomicUsize::new(0), + total, + } + } + + /// Return current counter value for done groups + pub(crate) fn load_done(&self) -> u64 { + self.done.load(Ordering::Acquire) as u64 + } + + /// Increment counter for done groups and return new value + pub(crate) fn increment_done(&self) -> u64 { + self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1 + } + + /// Return the number of total backup groups + pub(crate) fn total_groups(&self) -> u64 { + self.total as u64 + } +} -- 2.47.3