From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 797221FF13E for ; Fri, 17 Apr 2026 11:27:39 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 579C71C323; Fri, 17 Apr 2026 11:27:38 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Date: Fri, 17 Apr 2026 11:26:19 +0200 Message-ID: <20260417092621.455374-14-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com> References: <20260417092621.455374-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776417915411 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.070 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: KIGTPGEVI32NSM5X7YJ3TTH7CTDWQCHH X-Message-ID-Hash: KIGTPGEVI32NSM5X7YJ3TTH7CTDWQCHH X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Improve the throughput over high latency connections for sync jobs in push direction by allowing to push up to a configured number of backup groups concurrently. Just like for pull sync jobs, use an bounded join set to run up to the configured number of group worker tokio tasks in parallel, each connecting and pushing a group to the reomte target. The store progress and sync group housekeeping are placed behind a atomic reference counted mutex to allow for concurrent access of status updates. Signed-off-by: Christian Ebner --- changes since version 5: - uses BoundedJoinSet implementation, refactored accordingly src/server/push.rs | 102 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 77 insertions(+), 25 deletions(-) diff --git a/src/server/push.rs b/src/server/push.rs index 14395fe61..9b7fb4522 100644 --- a/src/server/push.rs +++ b/src/server/push.rs @@ -27,6 +27,7 @@ use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::index::IndexFile; use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{DataStore, StoreProgress}; +use pbs_tools::bounded_join_set::BoundedJoinSet; use super::sync::{ check_namespace_depth_limit, exclude_not_verified_or_encrypted, @@ -34,6 +35,7 @@ use super::sync::{ SyncSource, SyncStats, }; use crate::api2::config::remote; +use crate::server::sync::SharedGroupProgress; /// Target for backups to be pushed to pub(crate) struct PushTarget { @@ -551,41 +553,62 @@ pub(crate) async fn push_namespace( let mut errors = false; // Remember synced groups, remove others when the remove vanished flag is set - let mut synced_groups = HashSet::new(); + let synced_groups = Arc::new(Mutex::new(HashSet::new())); let mut progress = StoreProgress::new(list.len() as u64); let mut stats = SyncStats::default(); let (owned_target_groups, not_owned_target_groups) = fetch_target_groups(¶ms, &target_namespace).await?; + let not_owned_target_groups = Arc::new(not_owned_target_groups); - for (done, group) in list.into_iter().enumerate() { - progress.done_groups = done as u64; - progress.done_snapshots = 0; - progress.group_snapshots = 0; + let mut group_workers = BoundedJoinSet::new(params.worker_threads.unwrap_or(1)); + let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len())); - if not_owned_target_groups.contains(&group) { - warn!( - "Group '{group}' not owned by remote user '{}' on target, skipping upload", - params.target.remote_user(), - ); - continue; - } - synced_groups.insert(group.clone()); - - match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await { - Ok(sync_stats) => stats.add(sync_stats), - Err(err) => { - warn!("Encountered errors: {err:#}"); - warn!("Failed to push group {group} to remote!"); - errors = true; + let mut process_results = |results| { + for result in results { + match result { + Ok(sync_stats) => { + stats.add(sync_stats); + progress.done_groups = shared_group_progress.increment_done(); + } + Err(()) => errors = true, } } + }; + + for group in list.into_iter() { + let namespace = namespace.clone(); + let params = Arc::clone(¶ms); + let not_owned_target_groups = Arc::clone(¬_owned_target_groups); + let synced_groups = Arc::clone(&synced_groups); + let group_progress_cloned = Arc::clone(&shared_group_progress); + let results = group_workers + .spawn_task(async move { + push_group_do( + params, + &namespace, + &group, + group_progress_cloned, + synced_groups, + not_owned_target_groups, + ) + .await + }) + .await + .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?; + process_results(results); } + let results = group_workers + .join_active() + .await + .map_err(|err| format_err!("failed to join on worker task: {err:#}"))?; + process_results(results); + if params.remove_vanished { // only ever allow to prune owned groups on target for target_group in owned_target_groups { - if synced_groups.contains(&target_group) { + if synced_groups.lock().unwrap().contains(&target_group) { continue; } if !target_group.apply_filters(¶ms.group_filter) { @@ -664,6 +687,32 @@ async fn forget_target_snapshot( Ok(()) } +async fn push_group_do( + params: Arc, + namespace: &BackupNamespace, + group: &BackupGroup, + shared_group_progress: Arc, + synced_groups: Arc>>, + not_owned_target_groups: Arc>, +) -> Result { + if not_owned_target_groups.contains(group) { + warn!( + "Group '{group}' not owned by remote user '{}' on target, skipping upload", + params.target.remote_user(), + ); + shared_group_progress.increment_done(); + return Ok(SyncStats::default()); + } + + synced_groups.lock().unwrap().insert(group.clone()); + push_group(params, namespace, group, Arc::clone(&shared_group_progress)) + .await + .map_err(|err| { + warn!("Group {group}: Encountered errors: {err:#}"); + warn!("Failed to push group {group} to remote!"); + }) +} + /// Push group including all snaphshots to target /// /// Iterate over all snapshots in the group and push them to the target. @@ -677,7 +726,7 @@ pub(crate) async fn push_group( params: Arc, namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -745,7 +794,8 @@ pub(crate) async fn push_group( transfer_last_skip_info.reset(); } - progress.group_snapshots = snapshots.len() as u64; + let mut local_progress = StoreProgress::new(shared_group_progress.total_groups()); + local_progress.group_snapshots = snapshots.len() as u64; let mut stats = SyncStats::default(); let mut fetch_previous_manifest = !target_snapshots.is_empty(); @@ -759,8 +809,10 @@ pub(crate) async fn push_group( .await; fetch_previous_manifest = true; - progress.done_snapshots = pos as u64 + 1; - info!("Percentage done: {progress}"); + // Update done groups progress by other parallel running pushes + local_progress.done_groups = shared_group_progress.load_done(); + local_progress.done_snapshots = pos as u64 + 1; + info!("Percentage done: group {group}: {local_progress}"); // stop on error let sync_stats = result?; -- 2.47.3