From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id CB14C1FF136 for ; Mon, 09 Mar 2026 17:22:14 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 626FB3FF8; Mon, 9 Mar 2026 17:22:06 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently Date: Mon, 9 Mar 2026 17:20:48 +0100 Message-ID: <20260309162050.1047341-11-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260309162050.1047341-1-c.ebner@proxmox.com> References: <20260309162050.1047341-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773073230030 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.055 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 227QEROM46XJJB3QM72OBTVP76S2QNEW X-Message-ID-Hash: 227QEROM46XJJB3QM72OBTVP76S2QNEW X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Improve the throughput for sync jobs in push direction by allowing to push up to a configured number of backup groups concurrently, by creating multiple futures, each connecting and pushing a group to the reomte target. The store progress and sync group housekeeping are placed behind a atomic reference counted mutex to allow for concurrent access of status updates. Signed-off-by: Christian Ebner --- src/server/push.rs | 97 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/src/server/push.rs b/src/server/push.rs index 5828f2ed1..e7d56cc2a 100644 --- a/src/server/push.rs +++ b/src/server/push.rs @@ -34,6 +34,7 @@ use super::sync::{ SyncSource, SyncStats, }; use crate::api2::config::remote; +use crate::server::sync::{GroupWorkerSet, SharedGroupProgress}; /// Target for backups to be pushed to pub(crate) struct PushTarget { @@ -550,41 +551,58 @@ pub(crate) async fn push_namespace( let mut errors = false; // Remember synced groups, remove others when the remove vanished flag is set - let mut synced_groups = HashSet::new(); + let synced_groups = Arc::new(Mutex::new(HashSet::new())); let mut progress = StoreProgress::new(list.len() as u64); let mut stats = SyncStats::default(); let (owned_target_groups, not_owned_target_groups) = fetch_target_groups(¶ms, &target_namespace).await?; + let not_owned_target_groups = Arc::new(not_owned_target_groups); - for (done, group) in list.into_iter().enumerate() { - progress.done_groups = done as u64; - progress.done_snapshots = 0; - progress.group_snapshots = 0; + let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1)); + let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len())); - if not_owned_target_groups.contains(&group) { - warn!( - "Group '{group}' not owned by remote user '{}' on target, skipping upload", - params.target.remote_user(), - ); - continue; - } - synced_groups.insert(group.clone()); - - match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await { - Ok(sync_stats) => stats.add(sync_stats), - Err(err) => { - warn!("Encountered errors: {err:#}"); - warn!("Failed to push group {group} to remote!"); - errors = true; + let mut process_results = |results| { + for result in results { + match result { + Ok(sync_stats) => { + stats.add(sync_stats); + progress.done_groups = shared_group_progress.increment_done(); + } + Err(()) => errors = true, } } + }; + + for group in list.into_iter() { + let namespace = namespace.clone(); + let params = Arc::clone(¶ms); + let not_owned_target_groups = Arc::clone(¬_owned_target_groups); + let synced_groups = Arc::clone(&synced_groups); + let group_progress_cloned = Arc::clone(&shared_group_progress); + let results = group_workers + .spawn_task(async move { + push_group_do( + params, + &namespace, + &group, + group_progress_cloned, + synced_groups, + not_owned_target_groups, + ) + .await + }) + .await; + process_results(results); } + let results = group_workers.join_active().await; + process_results(results); + if params.remove_vanished { // only ever allow to prune owned groups on target for target_group in owned_target_groups { - if synced_groups.contains(&target_group) { + if synced_groups.lock().unwrap().contains(&target_group) { continue; } if !target_group.apply_filters(¶ms.group_filter) { @@ -663,6 +681,32 @@ async fn forget_target_snapshot( Ok(()) } +async fn push_group_do( + params: Arc, + namespace: &BackupNamespace, + group: &BackupGroup, + shared_group_progress: Arc, + synced_groups: Arc>>, + not_owned_target_groups: Arc>, +) -> Result { + if not_owned_target_groups.contains(group) { + warn!( + "Group '{group}' not owned by remote user '{}' on target, skipping upload", + params.target.remote_user(), + ); + shared_group_progress.increment_done(); + return Ok(SyncStats::default()); + } + + synced_groups.lock().unwrap().insert(group.clone()); + push_group(params, namespace, group, Arc::clone(&shared_group_progress)) + .await + .map_err(|err| { + warn!("Group {group}: Encountered errors: {err:#}"); + warn!("Failed to push group {group} to remote!"); + }) +} + /// Push group including all snaphshots to target /// /// Iterate over all snapshots in the group and push them to the target. @@ -676,7 +720,7 @@ pub(crate) async fn push_group( params: Arc, namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -744,7 +788,8 @@ pub(crate) async fn push_group( transfer_last_skip_info.reset(); } - progress.group_snapshots = snapshots.len() as u64; + let mut local_progress = StoreProgress::new(shared_group_progress.total_groups()); + local_progress.group_snapshots = snapshots.len() as u64; let mut stats = SyncStats::default(); let mut fetch_previous_manifest = !target_snapshots.is_empty(); @@ -758,8 +803,10 @@ pub(crate) async fn push_group( .await; fetch_previous_manifest = true; - progress.done_snapshots = pos as u64 + 1; - info!("Percentage done: {progress}"); + // Update done groups progress by other parallel running pushes + local_progress.done_groups = shared_group_progress.load_done(); + local_progress.done_snapshots = pos as u64 + 1; + info!("Percentage done: group {group}: {local_progress}"); // stop on error let sync_stats = result?; -- 2.47.3