all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com
Subject: Re: [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel
Date: Mon, 23 Mar 2026 13:36:37 +0100	[thread overview]
Message-ID: <1774267967.mydro9oosc.astroid@yuna.none> (raw)
In-Reply-To: <20260309162050.1047341-8-c.ebner@proxmox.com>

On March 9, 2026 5:20 pm, Christian Ebner wrote:
> Currently, a sync job sequentially pulls the backup groups and the
> snapshots contained within them. It is therefore limited in download
> speed by the single HTTP/2 connection of the source reader instance
> in case of remote syncs. For high latency networks, this suffer from
> limited download speed due to head of line blocking.
> 
> Improve the throughput by allowing to pull up to a configured number
> of backup groups in parallel, by creating a tokio task set which
> concurrently pulls from the remote source. Since these are dedicated
> tasks, the can run independent and in parallel on the tokio runtime.
> 
> Store progress output is now prefixed by the group as it depends on
> the group being pulled since the snapshot count differs. To update
> the output on a per group level, the shared group progress count is
> passed as atomic counter, the store progress accounted globally as
> well as per-group.
> 
> Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
>  src/server/pull.rs | 69 +++++++++++++++++++++++++-------------
>  src/server/sync.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 128 insertions(+), 23 deletions(-)
> 
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 3d7d47b9c..b11e93e6c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs

[..]

> diff --git a/src/server/sync.rs b/src/server/sync.rs
> index 9e6aeb9b0..17d736c41 100644
> --- a/src/server/sync.rs
> +++ b/src/server/sync.rs
> @@ -1,9 +1,11 @@
>  //! Sync datastore contents from source to target, either in push or pull direction
>  
>  use std::collections::HashMap;
> +use std::future::Future;
>  use std::io::{Seek, Write};
>  use std::ops::Deref;
>  use std::path::{Path, PathBuf};
> +use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::Duration;
>  
> @@ -12,9 +14,11 @@ use futures::{future::FutureExt, select};
>  use hyper::http::StatusCode;
>  use pbs_config::BackupLockGuard;
>  use serde_json::json;
> +use tokio::task::JoinSet;
>  use tracing::{info, warn};
>  
>  use proxmox_human_byte::HumanByte;
> +use proxmox_log::LogContext;
>  use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
>  
> @@ -792,3 +796,81 @@ pub(super) fn exclude_not_verified_or_encrypted(
>  
>      false
>  }
> +
> +/// Process up to preconfigured number of group sync tasks concurrently,
> +/// running on different threads when possible.
> +pub(crate) struct GroupWorkerSet<T> {
> +    capacity: usize,
> +    workers: JoinSet<T>,
> +}
> +
> +impl<T: Send + 'static> GroupWorkerSet<T> {
> +    /// Create a new worker set which allows to run up to `capacity` concurrent tasks.
> +    pub(crate) fn with_capacity(capacity: usize) -> Self {
> +        Self {
> +            capacity,
> +            workers: JoinSet::new(),
> +        }
> +    }

capacity is a bit confusing here, as it could also mean "initial" or
expected number of workers, not a hard limit.. the capacity is also not
an optional aspect used for optimization, but a required part of the
setup..

why not call this `new` and let it take a `max_tasks` and rename the
field accordingly?

> +
> +    /// Spawn the given task on the workers, waiting until there is capacity to do so.
> +    pub(crate) async fn spawn_task<F>(&mut self, task: F) -> Vec<T>
> +    where
> +        F: Future<Output = T>,
> +        F: Send + 'static,
> +    {
> +        let mut results = Vec::with_capacity(self.workers.len());
> +        while self.workers.len() >= self.capacity {
> +            // capacity reached, wait for an active task to complete
> +            if let Some(result) = self.workers.join_next().await {
> +                results.push(result.unwrap());

should we handle a JoinError here in a meaningful fashion?

> +            }
> +        }
> +
> +        match LogContext::current() {
> +            Some(context) => self.workers.spawn(context.scope(task)),
> +            None => self.workers.spawn(task),
> +        };
> +        results
> +    }
> +
> +    /// Wait on all active tasks to run to completion.
> +    pub(crate) async fn join_active(&mut self) -> Vec<T> {
> +        let mut results = Vec::with_capacity(self.workers.len());
> +        while let Some(result) = self.workers.join_next().await {
> +            results.push(result.unwrap());

same here

> +        }
> +        results
> +    }
> +}

IMHO this could be a stand-alone patch, with some arguments/rationale
why we need it ;) we already have very similar code using regular
threads in ParallelHandler..

also, none of the code above is sync specific, this is a generic
"JoinSet with capacity" wrapper, and as such we could also put it in a
more generic place *if* we really want to have two implementations..

> +
> +/// Track group progress during parallel push/pull in sync jobs
> +pub(crate) struct SharedGroupProgress {
> +    done: AtomicUsize,
> +    total: usize,
> +}
> +
> +impl SharedGroupProgress {
> +    /// Create a new instance to track group progress with expected total number of groups
> +    pub(crate) fn with_total_groups(total: usize) -> Self {
> +        Self {
> +            done: AtomicUsize::new(0),
> +            total,
> +        }
> +    }
> +
> +    /// Return current counter value for done groups
> +    pub(crate) fn load_done(&self) -> u64 {
> +        self.done.load(Ordering::Acquire) as u64
> +    }
> +
> +    /// Increment counter for done groups and return new value
> +    pub(crate) fn increment_done(&self) -> u64 {
> +        self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1
> +    }
> +
> +    /// Return the number of total backup groups
> +    pub(crate) fn total_groups(&self) -> u64 {
> +        self.total as u64
> +    }
> +}

see cover letter reply for this part..

> -- 
> 2.47.3
> 
> 
> 
> 
> 
> 




  reply	other threads:[~2026-03-23 12:36 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-03-23 12:36   ` Fabian Grünbichler [this message]
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner
2026-03-23 12:37 ` [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1774267967.mydro9oosc.astroid@yuna.none \
    --to=f.gruenbichler@proxmox.com \
    --cc=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal