From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pbs-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 0159E1FF168 for <inbox@lore.proxmox.com>; Tue, 18 Mar 2025 13:24:58 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id BEDDE1E897; Tue, 18 Mar 2025 13:24:43 +0100 (CET) From: Christian Ebner <c.ebner@proxmox.com> To: pbs-devel@lists.proxmox.com Date: Tue, 18 Mar 2025 13:24:22 +0100 Message-Id: <20250318122423.385684-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250318122423.385684-1-c.ebner@proxmox.com> References: <20250318122423.385684-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.031 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 6/7] server: sync: allow pushing groups concurrently X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion <pbs-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/> List-Post: <mailto:pbs-devel@lists.proxmox.com> List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Backup Server development discussion <pbs-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com> Improve the throughput for sync jobs in push direction by allowing to push up to a configured number of backup groups concurrently, by creating multiple futures, each connecting and pushing a group to the reomte target. The store progress and sync group housekeeping are placed behind a atomic reference counted mutex to allow for concurrent access of status updates. Signed-off-by: Christian Ebner <c.ebner@proxmox.com> --- changes since version 2: - no changes src/server/push.rs | 94 +++++++++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/src/server/push.rs b/src/server/push.rs index 878b0f6d6..4188955f5 100644 --- a/src/server/push.rs +++ b/src/server/push.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Context, Error}; -use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{info, warn}; @@ -535,41 +535,46 @@ pub(crate) async fn push_namespace( let mut errors = false; // Remember synced groups, remove others when the remove vanished flag is set - let mut synced_groups = HashSet::new(); - let mut progress = StoreProgress::new(list.len() as u64); + let synced_groups = Arc::new(Mutex::new(HashSet::new())); + let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64))); let mut stats = SyncStats::default(); let (owned_target_groups, not_owned_target_groups) = fetch_target_groups(params, &target_namespace).await?; + let not_owned_target_groups = Arc::new(not_owned_target_groups); + + let mut pusher = FuturesUnordered::new(); + let mut group_futures_iter = list.iter().map(|group| { + push_group_do( + params, + namespace, + group, + progress.clone(), + synced_groups.clone(), + not_owned_target_groups.clone(), + ) + }); - for (done, group) in list.into_iter().enumerate() { - progress.done_groups = done as u64; - progress.done_snapshots = 0; - progress.group_snapshots = 0; - - if not_owned_target_groups.contains(&group) { - warn!( - "Group '{group}' not owned by remote user '{}' on target, skipping upload", - params.target.remote_user(), - ); - continue; + for _ in 0..params.parallel_groups.unwrap_or(1) { + if let Some(future) = group_futures_iter.next() { + pusher.push(future); } - synced_groups.insert(group.clone()); + } - match push_group(params, namespace, &group, &mut progress).await { + while let Some(result) = pusher.next().await { + match result { Ok(sync_stats) => stats.add(sync_stats), - Err(err) => { - warn!("Encountered errors: {err:#}"); - warn!("Failed to push group {group} to remote!"); - errors = true; - } + Err(()) => errors |= true, + }; + if let Some(future) = group_futures_iter.next() { + pusher.push(future); } } if params.remove_vanished { // only ever allow to prune owned groups on target for target_group in owned_target_groups { - if synced_groups.contains(&target_group) { + if synced_groups.lock().unwrap().contains(&target_group) { continue; } if !target_group.apply_filters(¶ms.group_filter) { @@ -601,6 +606,8 @@ pub(crate) async fn push_namespace( } } + let progress = progress.lock().unwrap().clone(); + Ok((progress, stats, errors)) } @@ -648,6 +655,37 @@ async fn forget_target_snapshot( Ok(()) } +async fn push_group_do( + params: &PushParameters, + namespace: &BackupNamespace, + group: &BackupGroup, + progress: Arc<Mutex<StoreProgress>>, + synced_groups: Arc<Mutex<HashSet<BackupGroup>>>, + not_owned_target_groups: Arc<HashSet<BackupGroup>>, +) -> Result<SyncStats, ()> { + if not_owned_target_groups.contains(&group) { + warn!( + "Group '{group}' not owned by remote user '{}' on target, skipping upload", + params.target.remote_user(), + ); + progress.lock().unwrap().done_groups += 1; + return Ok(SyncStats::default()); + } + + synced_groups.lock().unwrap().insert(group.clone()); + match push_group(params, namespace, group, progress.clone()).await { + Ok(sync_stats) => { + progress.lock().unwrap().done_groups += 1; + Ok(sync_stats) + } + Err(err) => { + warn!("Group {group}: Encountered errors: {err:#}"); + warn!("Failed to push group {group} to remote!"); + Err(()) + } + } +} + /// Push group including all snaphshots to target /// /// Iterate over all snapshots in the group and push them to the target. @@ -661,7 +699,7 @@ pub(crate) async fn push_group( params: &PushParameters, namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + store_progress: Arc<Mutex<StoreProgress>>, ) -> Result<SyncStats, Error> { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -716,7 +754,8 @@ pub(crate) async fn push_group( transfer_last_skip_info.reset(); } - progress.group_snapshots = snapshots.len() as u64; + let mut local_progress = store_progress.lock().unwrap().clone(); + local_progress.group_snapshots = snapshots.len() as u64; let mut stats = SyncStats::default(); let mut fetch_previous_manifest = !target_snapshots.is_empty(); @@ -725,8 +764,11 @@ pub(crate) async fn push_group( push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await; fetch_previous_manifest = true; - progress.done_snapshots = pos as u64 + 1; - info!("Percentage done: {progress}"); + store_progress.lock().unwrap().done_snapshots += 1; + local_progress.done_snapshots = pos as u64 + 1; + // Update done groups progress by other parallel running pushes + local_progress.done_groups = store_progress.lock().unwrap().done_groups; + info!("Percentage done: {local_progress}"); // stop on error let sync_stats = result?; -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel