From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 38FA11FF13B for ; Wed, 22 Apr 2026 15:18:47 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1AAAC1F36F; Wed, 22 Apr 2026 15:18:43 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v8 03/10] tools: add bounded join set to run concurrent tasks bound by limit Date: Wed, 22 Apr 2026 15:18:13 +0200 Message-ID: <20260422131820.769620-4-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260422131820.769620-1-c.ebner@proxmox.com> References: <20260422131820.769620-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1776863830076 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.071 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Message-ID-Hash: 5IWGVYZRRQ3H25XBJ4VRETBAWNCUTIEV X-Message-ID-Hash: 5IWGVYZRRQ3H25XBJ4VRETBAWNCUTIEV X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: The BoundedJoinSet allows to run tasks concurrently via a JoinSet, but constrains the number of concurrent tasks by an upper limit. In contrast to the ParallelHandler implementation, which is a purely sync implementation and does not provide easy handling for returned results, this allows to execute tasks in an async context with straight forward handling of results, as required for e.g. pulling/pushing of backup groups in parallel for sync jobs. Also, log context is easily preserved, which is of importance for task logging. Co-developed-by: Fabian Grünbichler Signed-off-by: Christian Ebner --- pbs-tools/src/bounded_join_set.rs | 81 +++++++++++++++++++++++++++++++ pbs-tools/src/lib.rs | 1 + 2 files changed, 82 insertions(+) create mode 100644 pbs-tools/src/bounded_join_set.rs diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_join_set.rs new file mode 100644 index 000000000..07500c66b --- /dev/null +++ b/pbs-tools/src/bounded_join_set.rs @@ -0,0 +1,81 @@ +//! JoinSet with an upper bound of concurrent tasks. +//! +//! Allows to run up to the configured number of tasks concurrently in an async +//! context. + +use std::future::Future; + +use tokio::task::{JoinError, JoinSet}; + +use proxmox_log::LogContext; + +/// Run up to preconfigured number of futures concurrently on tokio tasks. +pub struct BoundedJoinSet { + /// Upper bound for concurrent task execution + max_tasks: usize, + /// Handles to currently spawned tasks + workers: JoinSet, +} + +impl BoundedJoinSet { + /// Create a new join set with up to `max_task` concurrently executed tasks. + pub fn new(max_tasks: usize) -> Self { + Self { + max_tasks, + workers: JoinSet::new(), + } + } + + /// Spawn the given task on the workers, waiting until there is capacity to do so. + /// + /// If there is no capacity, this will await until there is so, returning the results + /// for the finished task(s) providing the now free running slot in order of completion + /// or a `JoinError` if joining failed. + pub async fn spawn_task(&mut self, task: F) -> Result, JoinError> + where + F: Future, + F: Send + 'static, + { + let mut results = Vec::with_capacity(self.workers.len()); + + // Collect already finished task results if there are some + while let Some(result) = self.workers.try_join_next() { + results.push(result?); + } + + while self.workers.len() >= self.max_tasks { + // capacity reached, wait for an active task to complete + if let Some(result) = self.workers.join_next().await { + results.push(result?); + } + } + + match LogContext::current() { + Some(context) => self.workers.spawn(context.scope(task)), + None => self.workers.spawn(task), + }; + + Ok(results) + } + + /// Waits until one of the tasks in the set completes and returns its output. + /// + /// Returns None if the set is empty. + pub async fn join_next(&mut self) -> Option> { + self.workers.join_next().await + } + + /// Wait on all spawned tasks to run to completion. + /// + /// Returns the results for each task in order of completion or a `JoinError` + /// if joining failed. + pub async fn join_spawned_tasks(&mut self) -> Result, JoinError> { + let mut results = Vec::with_capacity(self.workers.len()); + + while let Some(result) = self.workers.join_next().await { + results.push(result?); + } + + Ok(results) + } +} diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs index 1e3972c92..dc55366b6 100644 --- a/pbs-tools/src/lib.rs +++ b/pbs-tools/src/lib.rs @@ -1,4 +1,5 @@ pub mod async_lru_cache; +pub mod bounded_join_set; pub mod buffered_logger; pub mod cert; pub mod crypt_config; -- 2.47.3