From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 7053C1FF136 for ; Mon, 20 Apr 2026 13:15:17 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6150325C25; Mon, 20 Apr 2026 13:15:16 +0200 (CEST) Date: Mon, 20 Apr 2026 13:15:08 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= Subject: Re: [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit To: Christian Ebner , pbs-devel@lists.proxmox.com References: <20260417092621.455374-1-c.ebner@proxmox.com> <20260417092621.455374-5-c.ebner@proxmox.com> In-Reply-To: <20260417092621.455374-5-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.17.0 (https://github.com/astroidmail/astroid) Message-Id: <1776682693.qvzn009bkr.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776683628141 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.054 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 2J2YTW35FLCQGHGMW5TN7XHT27MQOO6K X-Message-ID-Hash: 2J2YTW35FLCQGHGMW5TN7XHT27MQOO6K X-MailFrom: f.gruenbichler@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On April 17, 2026 11:26 am, Christian Ebner wrote: > The BoundedJoinSet allows to run tasks concurrently via a JoinSet, > but constrains the number of concurrent tasks to be run at once by an > upper limit. constrains the number of concurrent tasks by an upper limit. "concurrent tasks" already states that they are at the same time ;) >=20 > In contrast to the ParallelHandler implementation, which is purely > sync implementation and does not provide easy handling for returned > results, rhis allows to execute tasks in an async context with straight which is *a* purely s/rhis/this > forward handling of results, as required for e.g. pulling/pushing of > backup groups in parallel for sync jobs. Also, log context is easily > preserved, which is of importance for task logging. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 5: > - not present in previous version, refactored logic from previous > GroupWorker implementation. >=20 > pbs-tools/src/bounded_join_set.rs | 69 +++++++++++++++++++++++++++++++ > pbs-tools/src/lib.rs | 1 + > 2 files changed, 70 insertions(+) > create mode 100644 pbs-tools/src/bounded_join_set.rs >=20 > diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_jo= in_set.rs > new file mode 100644 > index 000000000..01b27b2a6 > --- /dev/null > +++ b/pbs-tools/src/bounded_join_set.rs > @@ -0,0 +1,69 @@ > +//! JoinSet with an upper bound of concurrent tasks. > +//! > +//! Allows to run up to the configured number of tasks concurrently in a= n async > +//! context. > + > +use std::future::Future; > + > +use tokio::task::{JoinError, JoinSet}; > + > +use proxmox_log::LogContext; > + > +/// Run up to preconfigured number of futures concurrently on tokio task= s. > +pub struct BoundedJoinSet { > + // upper bound for concurrent task execution > + max_tasks: usize, > + // handles to currently active tasks > + workers: JoinSet, the tasks might also no longer be active - what they have in common is that they've been spawned ;) > +} > + > +impl BoundedJoinSet { > + /// Create a new join set with up to `max_task` concurrently execute= d tasks. > + pub fn new(max_tasks: usize) -> Self { > + Self { > + max_tasks, > + workers: JoinSet::new(), > + } > + } > + > + /// Spawn the given task on the workers, waiting until there is capa= city to do so. > + /// > + /// If there is no capacity, this will await until there is so, retu= rning the results > + /// for the finished task(s) providing the now free running slot in = order of completion > + /// or a `JoinError` if joining failed. > + pub async fn spawn_task(&mut self, task: F) -> Result, Joi= nError> > + where > + F: Future, > + F: Send + 'static, > + { > + let mut results =3D Vec::with_capacity(self.workers.len()); > + > + while self.workers.len() >=3D self.max_tasks { > + // capacity reached, wait for an active task to complete > + if let Some(result) =3D self.workers.join_next().await { > + results.push(result?); > + } > + } by virtue of its design, there can only ever be a single result returned here (because the join set can only be at capacity, not over), right? should we assert this here and encode it in the return value? or did you actually intend for this to return all completed tasks? in that case, we need a loop with try_join_next in addition to the blocking call.. that might actually be benefitial, to log results early.. similar question - do we want to be able to retrieve individual results as they become available, without the need to spawn new tasks? e.g., have a `join_next` that the sync jobs can call in a loop once they've spawned all the groups they want to spawn? that would make `join_active` below not used at the moment, though it might still be helpful for some future use case? > + > + match LogContext::current() { > + Some(context) =3D> self.workers.spawn(context.scope(task)), > + None =3D> self.workers.spawn(task), > + }; > + > + Ok(results) > + } > + > + /// Wait on all active tasks to run to completion. > + /// > + /// Returns the results for each task in order of completion or a `J= oinError` > + /// if joining failed. > + pub async fn join_active(&mut self) -> Result, JoinError> { the active here is a misnomer as well.. for a join_set this is called join_all (modulo the panic behaviour, which we do not want here, so maybe we do need a different name..) > + let mut results =3D Vec::with_capacity(self.workers.len()); > + > + while let Some(result) =3D self.workers.join_next().await { > + results.push(result?); > + } > + > + Ok(results) > + } > +} > diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs > index 1e3972c92..dc55366b6 100644 > --- a/pbs-tools/src/lib.rs > +++ b/pbs-tools/src/lib.rs > @@ -1,4 +1,5 @@ > pub mod async_lru_cache; > +pub mod bounded_join_set; > pub mod buffered_logger; > pub mod cert; > pub mod crypt_config; > --=20 > 2.47.3 >=20 >=20 >=20 >=20 >=20 >=20