From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox v2 01/25] parallel-handler: import code from Proxmox Backup Server
Date: Thu, 19 Mar 2026 10:45:21 +0100 [thread overview]
Message-ID: <20260319094617.169594-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260319094617.169594-1-l.wagner@proxmox.com>
Code is left unchanged, improvements will follow in followup commits.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Arthur Bied-Charreton <a.bied-charreton@proxmox.com>
Tested-by: Arthur Bied-Charreton <a.bied-charreton@proxmox.com>
---
Cargo.toml | 2 +
proxmox-parallel-handler/Cargo.toml | 15 ++
proxmox-parallel-handler/debian/changelog | 5 +
proxmox-parallel-handler/debian/control | 36 ++++
proxmox-parallel-handler/debian/copyright | 18 ++
proxmox-parallel-handler/debian/debcargo.toml | 7 +
proxmox-parallel-handler/src/lib.rs | 160 ++++++++++++++++++
7 files changed, 243 insertions(+)
create mode 100644 proxmox-parallel-handler/Cargo.toml
create mode 100644 proxmox-parallel-handler/debian/changelog
create mode 100644 proxmox-parallel-handler/debian/control
create mode 100644 proxmox-parallel-handler/debian/copyright
create mode 100644 proxmox-parallel-handler/debian/debcargo.toml
create mode 100644 proxmox-parallel-handler/src/lib.rs
diff --git a/Cargo.toml b/Cargo.toml
index e66ffb78..784ae27a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ members = [
"proxmox-notify",
"proxmox-oci",
"proxmox-openid",
+ "proxmox-parallel-handler",
"proxmox-pgp",
"proxmox-product-config",
"proxmox-rate-limiter",
@@ -162,6 +163,7 @@ proxmox-lang = { version = "1.5", path = "proxmox-lang" }
proxmox-log = { version = "1.0.0", path = "proxmox-log" }
proxmox-login = { version = "1.0.0", path = "proxmox-login" }
proxmox-network-types = { version = "1.0.0", path = "proxmox-network-types" }
+proxmox-parallel-handler = { version = "1.0.0", path = "proxmox-parallel-handler" }
proxmox-pgp = { version = "1.0.0", path = "proxmox-pgp" }
proxmox-product-config = { version = "1.0.0", path = "proxmox-product-config" }
proxmox-config-digest = { version = "1.0.0", path = "proxmox-config-digest" }
diff --git a/proxmox-parallel-handler/Cargo.toml b/proxmox-parallel-handler/Cargo.toml
new file mode 100644
index 00000000..e55e7c63
--- /dev/null
+++ b/proxmox-parallel-handler/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "proxmox-parallel-handler"
+description = "thread pool which runs a closure in parallel"
+version = "1.0.0"
+
+authors.workspace = true
+edition.workspace = true
+exclude.workspace = true
+homepage.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+crossbeam-channel.workspace = true
diff --git a/proxmox-parallel-handler/debian/changelog b/proxmox-parallel-handler/debian/changelog
new file mode 100644
index 00000000..df00d3f6
--- /dev/null
+++ b/proxmox-parallel-handler/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-parallel-handler (1.0.0-1) unstable; urgency=medium
+
+ * initial version -- imported from proxmox-backup
+
+ -- Proxmox Support Team <support@proxmox.com> Thu, 26 Feb 2026 15:54:07 +0100
diff --git a/proxmox-parallel-handler/debian/control b/proxmox-parallel-handler/debian/control
new file mode 100644
index 00000000..ff356562
--- /dev/null
+++ b/proxmox-parallel-handler/debian/control
@@ -0,0 +1,36 @@
+Source: rust-proxmox-parallel-handler
+Section: rust
+Priority: optional
+Build-Depends: debhelper-compat (= 13),
+ dh-sequence-cargo
+Build-Depends-Arch: cargo:native <!nocheck>,
+ rustc:native <!nocheck>,
+ libstd-rust-dev <!nocheck>,
+ librust-anyhow-1+default-dev <!nocheck>,
+ librust-crossbeam-channel-0.5+default-dev <!nocheck>,
+ librust-thiserror-2+default-dev <!nocheck>
+Maintainer: Proxmox Support Team <support@proxmox.com>
+Standards-Version: 4.7.2
+Vcs-Git: git://git.proxmox.com/git/proxmox.git
+Vcs-Browser: https://git.proxmox.com/?p=proxmox.git
+Homepage: https://proxmox.com
+X-Cargo-Crate: proxmox-parallel-handler
+
+Package: librust-proxmox-parallel-handler-dev
+Architecture: any
+Multi-Arch: same
+Depends:
+ ${misc:Depends},
+ librust-anyhow-1+default-dev,
+ librust-crossbeam-channel-0.5+default-dev,
+ librust-thiserror-2+default-dev
+Provides:
+ librust-proxmox-parallel-handler+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0+default-dev (= ${binary:Version})
+Description: Thread pool which runs a closure in parallel - Rust source code
+ Source code for Debianized Rust crate "proxmox-parallel-handler"
diff --git a/proxmox-parallel-handler/debian/copyright b/proxmox-parallel-handler/debian/copyright
new file mode 100644
index 00000000..01138fa0
--- /dev/null
+++ b/proxmox-parallel-handler/debian/copyright
@@ -0,0 +1,18 @@
+Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
+
+Files:
+ *
+Copyright: 2026 Proxmox Server Solutions GmbH <support@proxmox.com>
+License: AGPL-3.0-or-later
+ This program is free software: you can redistribute it and/or modify it under
+ the terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or (at your option) any
+ later version.
+ .
+ This program is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+ details.
+ .
+ You should have received a copy of the GNU Affero General Public License along
+ with this program. If not, see <https://www.gnu.org/licenses/>.
diff --git a/proxmox-parallel-handler/debian/debcargo.toml b/proxmox-parallel-handler/debian/debcargo.toml
new file mode 100644
index 00000000..b7864cdb
--- /dev/null
+++ b/proxmox-parallel-handler/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs
new file mode 100644
index 00000000..75eab184
--- /dev/null
+++ b/proxmox-parallel-handler/src/lib.rs
@@ -0,0 +1,160 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, Sender};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{name} ({i})"))
+ .spawn(move || loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
--
2.47.3
next prev parent reply other threads:[~2026-03-19 9:47 UTC|newest]
Thread overview: 25+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-19 9:45 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} v2 00/25] metric collection for the PDM host Lukas Wagner
2026-03-19 9:45 ` Lukas Wagner [this message]
2026-03-19 9:45 ` [PATCH proxmox v2 02/25] parallel-handler: introduce custom error type Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 03/25] parallel-handler: add documentation Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 04/25] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 06/25] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 07/25] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 08/25] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 09/25] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 10/25] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox v2 11/25] time: use u64 parse helper from nom Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-backup v2 12/25] tools: move ParallelHandler to new proxmox-parallel-handler crate Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-backup v2 13/25] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-backup v2 14/25] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-yew-comp v2 15/25] node status panel: add `children` property Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-yew-comp v2 16/25] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-19 9:45 ` [PATCH proxmox-yew-comp v2 17/25] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 18/25] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 19/25] metric collection: fix minor typo in error message Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 20/25] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 21/25] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 22/25] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 23/25] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 24/25] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-19 9:45 ` [PATCH datacenter-manager v2 25/25] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260319094617.169594-2-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.