From: "Max Carrara" <m.carrara@proxmox.com>
To: "Proxmox Backup Server development discussion"
<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently
Date: Fri, 04 Apr 2025 20:02:04 +0200 [thread overview]
Message-ID: <D8Y1V8BY8F6I.25I1HV13SAN63@proxmox.com> (raw)
In-Reply-To: <20250404134936.425392-5-c.ebner@proxmox.com>
On Fri Apr 4, 2025 at 3:49 PM CEST, Christian Ebner wrote:
> Currently, a sync job sequentially pulls the backup groups and the
> snapshots contained within them, therefore being limited in download
> speed by the http2 connection of the source reader instance in case
> of remote syncs. High latency networks suffer from limited download
> speed.
>
> Improve the throughput by allowing to pull up to a configured number
> of backup groups concurrently, by creating tasks connecting and
> pulling from the remote source in parallel.
>
> Make the error handling and accounting logic for each group pull
> reusable by moving it into its own helper function, returning the
> future.
>
> The store progress is placed behind an atomic reference counted mutex
> to allow for concurrent access of status updates.
Yeah, so... I've got some thoughts about this:
First of all, I think that that's *fine* here, as the
`Arc<Mutex<StoreProgress>>` probably isn't going to face that much lock
contention or something anyway. So to get that out of the way, IMO we
can keep that here as it is right now.
But in the future I do think that we should check the locations where we
have that kind of concurrent data access / modification, because I feel
the amount of mutexes is only going to continue growing. That's not a
bad thing per se, but it does come with a few risks / drawbacks (e.g.
higher risk for deadlocks).
Without going to deep into the whole "how to avoid deadlocks" discussion
and other things, here's an alternative I want to propose that could
perhaps be done in (or as part of a) different series, since it's a bit
out of scope for this one here. (Though, if you do wanna do it in this
one, I certainly won't complain!)
First, since `StoreProgress` only contains four `usize`s, it should be
fairly easy to convert the ones being modified into `AtomicUsize`s and
perhaps add helper methods to increase their respective values;
something like this:
#[derive(Debug, Default)]
/// Tracker for progress of operations iterating over `Datastore` contents.
pub struct StoreProgress {
/// Completed groups
pub done_groups: AtomicUsize,
/// Total groups
pub total_groups: u64,
/// Completed snapshots within current group
pub done_snapshots: AtomicUsize,
/// Total snapshots in current group
pub group_snapshots: u64,
}
// [...]
impl StoreProgress {
pub fn add_done_group(&self) {
let _ = self.done_groups.fetch_add(1, Ordering::Relaxed);
}
// [...]
}
(of course, what it all should look like in detail is up to bikeshedding :P)
Something like that would probably be nicer here, because:
- You won't need to wrap `StoreProgress` within an `Arc<Mutex<T>>`
anymore -- a shared reference is enough, since ...
- Operations on atomics take &self (that's the whole point of them ofc ;p )
This means that:
- Cloning an `Arc<T>` is not necessary anymore
--> should be approx. two atomic ops less times the amount of `Arc`s used
(on create/clone and drop for each `Arc`)
- Locking the `Mutex<T>` is also not necessary anymore, which means
--> should be two atomic ops less for each call to `.lock()`
(acquire and release)
In turn, this is replaced by a single atomic call with
`Ordering::Relaxed` (which is fine for counters [0]). So, something like
progress.lock().unwrap().done_groups += 1;
would just become
progress.add_done_group();
which is also quite neat.
Note however that we might have to split that struct into a "local" and
"shared" version or whatever in order to adapt it all to the current
code (a locally-used struct ofc doesn't need atomics).
Again, I think what you're doing here is perfectly fine; I just think
that we should have a look at all of those concurrent data accesses and
see whether we can slim some stuff down or perhaps have some kind of
statically enforced mutex ordering for deadlock prevention [1].
[0]: https://doc.rust-lang.org/nomicon/atomics.html#relaxed
[1]: https://www.youtube.com/watch?v=Ba7fajt4l1M
>
> Link to issue in bugtracker:
> https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Should be:
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
.. judging from our recent history at least. Can be adapted when
applying the patch, though.
>
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 3:
> - rebased onto current master
>
> pbs-datastore/src/store_progress.rs | 2 +-
> src/server/pull.rs | 111 ++++++++++++++++++----------
> 2 files changed, 72 insertions(+), 41 deletions(-)
>
> diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
> index a32bb9a9d..8afa60ace 100644
> --- a/pbs-datastore/src/store_progress.rs
> +++ b/pbs-datastore/src/store_progress.rs
> @@ -1,4 +1,4 @@
> -#[derive(Debug, Default)]
> +#[derive(Clone, Debug, Default)]
> /// Tracker for progress of operations iterating over `Datastore` contents.
> pub struct StoreProgress {
> /// Completed groups
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 46c3d8dc5..a484957ce 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> use anyhow::{bail, format_err, Error};
> +use futures::stream::FuturesUnordered;
> +use futures::StreamExt;
> use proxmox_human_byte::HumanByte;
> use tracing::info;
>
> @@ -512,7 +514,7 @@ async fn pull_group(
> params: &PullParameters,
> source_namespace: &BackupNamespace,
> group: &BackupGroup,
> - progress: &mut StoreProgress,
> + store_progress: Arc<Mutex<StoreProgress>>,
> ) -> Result<SyncStats, Error> {
> let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
> @@ -601,7 +603,8 @@ async fn pull_group(
> // start with 65536 chunks (up to 256 GiB)
> let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
> - progress.group_snapshots = list.len() as u64;
> + let mut local_progress = store_progress.lock().unwrap().clone();
> + local_progress.group_snapshots = list.len() as u64;
>
> let mut sync_stats = SyncStats::default();
>
> @@ -618,8 +621,11 @@ async fn pull_group(
> let result =
> pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
>
> - progress.done_snapshots = pos as u64 + 1;
> - info!("percentage done: {progress}");
> + store_progress.lock().unwrap().done_snapshots += 1;
> + // Update done groups progress by other parallel running pulls
> + local_progress.done_groups = store_progress.lock().unwrap().done_groups;
> + local_progress.done_snapshots = pos as u64 + 1;
> + info!("Percentage done: {local_progress}");
>
> let stats = result?; // stop on error
> sync_stats.add(stats);
> @@ -863,6 +869,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
> Ok(sync_stats)
> }
>
> +async fn pull_group_do(
> + params: &PullParameters,
> + group: &BackupGroup,
> + namespace: &BackupNamespace,
> + target_namespace: &BackupNamespace,
> + progress: Arc<Mutex<StoreProgress>>,
> +) -> Result<SyncStats, ()> {
> + let (owner, _lock_guard) =
> + match params
> + .target
> + .store
> + .create_locked_backup_group(target_namespace, group, ¶ms.owner)
> + {
> + Ok(result) => result,
> + Err(err) => {
> + info!("sync group {group} failed - group lock failed: {err}");
> + info!("create_locked_backup_group failed");
Since a lot of the surrounding code already does it it's fine I guess,
but... why do we use `info!` everywhere here for errors? Is there any
reason in particular? 😅
> + return Err(());
> + }
> + };
> +
> + if params.owner != owner {
> + // only the owner is allowed to create additional snapshots
> + info!(
> + "sync group {group} failed - owner check failed ({} != {owner})",
> + params.owner,
> + );
> + return Err(());
> + }
> +
> + match pull_group(params, namespace, group, progress.clone()).await {
> + Ok(sync_stats) => {
> + progress.lock().unwrap().done_groups += 1;
> + Ok(sync_stats)
> + }
> + Err(err) => {
> + info!("sync group {group} failed - {err}");
> + Err(())
> + }
> + }
> +}
> +
> /// Pulls a namespace according to `params`.
> ///
> /// Pulling a namespace consists of the following steps:
> @@ -901,48 +949,29 @@ pub(crate) async fn pull_ns(
> new_groups.insert(group.clone());
> }
>
> - let mut progress = StoreProgress::new(list.len() as u64);
> + let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
> let mut sync_stats = SyncStats::default();
>
> let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
>
> - for (done, group) in list.into_iter().enumerate() {
> - progress.done_groups = done as u64;
> - progress.done_snapshots = 0;
> - progress.group_snapshots = 0;
> + let mut puller = FuturesUnordered::new();
> + let mut group_futures_iter = list
> + .iter()
> + .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
>
> - let (owner, _lock_guard) =
> - match params
> - .target
> - .store
> - .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
> - {
> - Ok(result) => result,
> - Err(err) => {
> - info!("sync group {} failed - group lock failed: {err}", &group);
> - errors = true;
> - // do not stop here, instead continue
> - info!("create_locked_backup_group failed");
> - continue;
> - }
> - };
> + for _ in 0..params.parallel_groups.unwrap_or(1) {
> + if let Some(future) = group_futures_iter.next() {
> + puller.push(future);
> + }
> + }
>
> - // permission check
> - if params.owner != owner {
> - // only the owner is allowed to create additional snapshots
> - info!(
> - "sync group {} failed - owner check failed ({} != {owner})",
> - &group, params.owner
> - );
> - errors = true; // do not stop here, instead continue
> - } else {
> - match pull_group(params, namespace, &group, &mut progress).await {
> - Ok(stats) => sync_stats.add(stats),
> - Err(err) => {
> - info!("sync group {} failed - {err}", &group);
> - errors = true; // do not stop here, instead continue
> - }
> - }
> + while let Some(result) = puller.next().await {
> + match result {
> + Ok(stats) => sync_stats.add(stats),
> + Err(()) => errors |= true,
> + };
> + if let Some(future) = group_futures_iter.next() {
> + puller.push(future);
> }
> }
>
> @@ -998,5 +1027,7 @@ pub(crate) async fn pull_ns(
> };
> }
>
> + let progress = progress.lock().unwrap().clone();
> +
> Ok((progress, sync_stats, errors))
> }
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-04-04 18:02 UTC|newest]
Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-04-04 13:49 [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2025-04-04 18:01 ` Max Carrara
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2025-04-04 18:02 ` Max Carrara [this message]
2025-04-07 7:21 ` Fabian Grünbichler
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 6/7] server: sync: allow pushing groups concurrently Christian Ebner
2025-04-04 13:49 ` [pbs-devel] [PATCH v4 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
2025-04-04 18:01 ` [pbs-devel] [PATCH v4 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Max Carrara
2025-04-05 9:31 ` Christian Ebner
2025-04-09 10:22 ` Max Carrara
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=D8Y1V8BY8F6I.25I1HV13SAN63@proxmox.com \
--to=m.carrara@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal