From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently
Date: Fri, 4 Apr 2025 15:49:35 +0200 [thread overview]
Message-ID: <20250404134936.425392-7-c.ebner@proxmox.com> (raw)
In-Reply-To: <20250404134936.425392-1-c.ebner@proxmox.com>
Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- rebased onto current master
src/server/push.rs | 94 +++++++++++++++++++++++++++++++++-------------
1 file changed, 68 insertions(+), 26 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 878b0f6d6..4188955f5 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -4,7 +4,7 @@ use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Context, Error};
-use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::stream::{self, FuturesUnordered, StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info, warn};
@@ -535,41 +535,46 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
- let mut progress = StoreProgress::new(list.len() as u64);
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
+ let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(params, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
+
+ let mut pusher = FuturesUnordered::new();
+ let mut group_futures_iter = list.iter().map(|group| {
+ push_group_do(
+ params,
+ namespace,
+ group,
+ progress.clone(),
+ synced_groups.clone(),
+ not_owned_target_groups.clone(),
+ )
+ });
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
-
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
+ for _ in 0..params.parallel_groups.unwrap_or(1) {
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
- synced_groups.insert(group.clone());
+ }
- match push_group(params, namespace, &group, &mut progress).await {
+ while let Some(result) = pusher.next().await {
+ match result {
Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
- }
+ Err(()) => errors |= true,
+ };
+ if let Some(future) = group_futures_iter.next() {
+ pusher.push(future);
}
}
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -601,6 +606,8 @@ pub(crate) async fn push_namespace(
}
}
+ let progress = progress.lock().unwrap().clone();
+
Ok((progress, stats, errors))
}
@@ -648,6 +655,37 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: &PushParameters,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ progress: Arc<Mutex<StoreProgress>>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(&group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ progress.lock().unwrap().done_groups += 1;
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ match push_group(params, namespace, group, progress.clone()).await {
+ Ok(sync_stats) => {
+ progress.lock().unwrap().done_groups += 1;
+ Ok(sync_stats)
+ }
+ Err(err) => {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ Err(())
+ }
+ }
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -661,7 +699,7 @@ pub(crate) async fn push_group(
params: &PushParameters,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ store_progress: Arc<Mutex<StoreProgress>>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -716,7 +754,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = store_progress.lock().unwrap().clone();
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -725,8 +764,11 @@ pub(crate) async fn push_group(
push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ store_progress.lock().unwrap().done_snapshots += 1;
+ local_progress.done_snapshots = pos as u64 + 1;
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+ info!("Percentage done: {local_progress}");
// stop on error
let sync_stats = result?;
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-04 13:50 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2025-04-04 18:01 ` Max Carrara
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2025-04-04 18:02 ` Max Carrara
2025-04-07 7:21 ` Fabian Grünbichler
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
2025-04-04 13:49 ` Christian Ebner [this message]
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
2025-04-05 9:31 ` Christian Ebner
2025-04-09 10:22 ` Max Carrara
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250404134936.425392-7-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal