all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v6 04/15] tools: add bounded join set to run concurrent tasks bound by limit
Date: Fri, 17 Apr 2026 11:26:10 +0200	[thread overview]
Message-ID: <20260417092621.455374-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260417092621.455374-1-c.ebner@proxmox.com>

The BoundedJoinSet allows to run tasks concurrently via a JoinSet,
but constrains the number of concurrent tasks to be run at once by an
upper limit.

In contrast to the ParallelHandler implementation, which is purely
sync implementation and does not provide easy handling for returned
results, rhis allows to execute tasks in an async context with straight
forward handling of results, as required for e.g. pulling/pushing of
backup groups in parallel for sync jobs. Also, log context is easily
preserved, which is of importance for task logging.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- not present in previous version, refactored logic from previous
  GroupWorker implementation.

 pbs-tools/src/bounded_join_set.rs | 69 +++++++++++++++++++++++++++++++
 pbs-tools/src/lib.rs              |  1 +
 2 files changed, 70 insertions(+)
 create mode 100644 pbs-tools/src/bounded_join_set.rs

diff --git a/pbs-tools/src/bounded_join_set.rs b/pbs-tools/src/bounded_join_set.rs
new file mode 100644
index 000000000..01b27b2a6
--- /dev/null
+++ b/pbs-tools/src/bounded_join_set.rs
@@ -0,0 +1,69 @@
+//! JoinSet with an upper bound of concurrent tasks.
+//!
+//! Allows to run up to the configured number of tasks concurrently in an async
+//! context.
+
+use std::future::Future;
+
+use tokio::task::{JoinError, JoinSet};
+
+use proxmox_log::LogContext;
+
+/// Run up to preconfigured number of futures concurrently on tokio tasks.
+pub struct BoundedJoinSet<T> {
+    // upper bound for concurrent task execution
+    max_tasks: usize,
+    // handles to currently active tasks
+    workers: JoinSet<T>,
+}
+
+impl<T: Send + 'static> BoundedJoinSet<T> {
+    /// Create a new join set with up to `max_task` concurrently executed tasks.
+    pub fn new(max_tasks: usize) -> Self {
+        Self {
+            max_tasks,
+            workers: JoinSet::new(),
+        }
+    }
+
+    /// Spawn the given task on the workers, waiting until there is capacity to do so.
+    ///
+    /// If there is no capacity, this will await until there is so, returning the results
+    /// for the finished task(s) providing the now free running slot in order of completion
+    /// or a `JoinError` if joining failed.
+    pub async fn spawn_task<F>(&mut self, task: F) -> Result<Vec<T>, JoinError>
+    where
+        F: Future<Output = T>,
+        F: Send + 'static,
+    {
+        let mut results = Vec::with_capacity(self.workers.len());
+
+        while self.workers.len() >= self.max_tasks {
+            // capacity reached, wait for an active task to complete
+            if let Some(result) = self.workers.join_next().await {
+                results.push(result?);
+            }
+        }
+
+        match LogContext::current() {
+            Some(context) => self.workers.spawn(context.scope(task)),
+            None => self.workers.spawn(task),
+        };
+
+        Ok(results)
+    }
+
+    /// Wait on all active tasks to run to completion.
+    ///
+    /// Returns the results for each task in order of completion or a `JoinError`
+    /// if joining failed.
+    pub async fn join_active(&mut self) -> Result<Vec<T>, JoinError> {
+        let mut results = Vec::with_capacity(self.workers.len());
+
+        while let Some(result) = self.workers.join_next().await {
+            results.push(result?);
+        }
+
+        Ok(results)
+    }
+}
diff --git a/pbs-tools/src/lib.rs b/pbs-tools/src/lib.rs
index 1e3972c92..dc55366b6 100644
--- a/pbs-tools/src/lib.rs
+++ b/pbs-tools/src/lib.rs
@@ -1,4 +1,5 @@
 pub mod async_lru_cache;
+pub mod bounded_join_set;
 pub mod buffered_logger;
 pub mod cert;
 pub mod crypt_config;
-- 
2.47.3





  parent reply	other threads:[~2026-04-17  9:26 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-17  9:26 [PATCH proxmox{,-backup} v6 00/15] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox v6 01/15] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 02/15] tools: group and sort module imports Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 03/15] tools: implement buffered logger for concurrent log messages Christian Ebner
2026-04-17  9:26 ` Christian Ebner [this message]
2026-04-17  9:26 ` [PATCH proxmox-backup v6 05/15] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 06/15] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 07/15] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 08/15] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 09/15] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 10/15] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 11/15] server: pull: prefix log messages and add error context Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 12/15] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 13/15] server: sync: allow pushing groups concurrently Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 14/15] server: push: prefix log messages and add additional logging Christian Ebner
2026-04-17  9:26 ` [PATCH proxmox-backup v6 15/15] ui: expose group worker setting in sync job edit window Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260417092621.455374-5-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal