From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server
Date: Thu, 12 Mar 2026 14:52:03 +0100 [thread overview]
Message-ID: <20260312135229.420729-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>
Code is left unchanged, improvements will follow in followup commits.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Cargo.toml | 2 +
proxmox-parallel-handler/Cargo.toml | 15 ++
proxmox-parallel-handler/debian/changelog | 5 +
proxmox-parallel-handler/debian/control | 36 ++++
proxmox-parallel-handler/debian/copyright | 18 ++
proxmox-parallel-handler/debian/debcargo.toml | 7 +
proxmox-parallel-handler/src/lib.rs | 160 ++++++++++++++++++
7 files changed, 243 insertions(+)
create mode 100644 proxmox-parallel-handler/Cargo.toml
create mode 100644 proxmox-parallel-handler/debian/changelog
create mode 100644 proxmox-parallel-handler/debian/control
create mode 100644 proxmox-parallel-handler/debian/copyright
create mode 100644 proxmox-parallel-handler/debian/debcargo.toml
create mode 100644 proxmox-parallel-handler/src/lib.rs
diff --git a/Cargo.toml b/Cargo.toml
index 1cb5f09e..97593a5d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ members = [
"proxmox-notify",
"proxmox-oci",
"proxmox-openid",
+ "proxmox-parallel-handler",
"proxmox-pgp",
"proxmox-product-config",
"proxmox-rate-limiter",
@@ -162,6 +163,7 @@ proxmox-lang = { version = "1.5", path = "proxmox-lang" }
proxmox-log = { version = "1.0.0", path = "proxmox-log" }
proxmox-login = { version = "1.0.0", path = "proxmox-login" }
proxmox-network-types = { version = "1.0.0", path = "proxmox-network-types" }
+proxmox-parallel-handler = { version = "1.0.0", path = "proxmox-parallel-handler" }
proxmox-pgp = { version = "1.0.0", path = "proxmox-pgp" }
proxmox-product-config = { version = "1.0.0", path = "proxmox-product-config" }
proxmox-config-digest = { version = "1.0.0", path = "proxmox-config-digest" }
diff --git a/proxmox-parallel-handler/Cargo.toml b/proxmox-parallel-handler/Cargo.toml
new file mode 100644
index 00000000..e55e7c63
--- /dev/null
+++ b/proxmox-parallel-handler/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "proxmox-parallel-handler"
+description = "thread pool which runs a closure in parallel"
+version = "1.0.0"
+
+authors.workspace = true
+edition.workspace = true
+exclude.workspace = true
+homepage.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+crossbeam-channel.workspace = true
diff --git a/proxmox-parallel-handler/debian/changelog b/proxmox-parallel-handler/debian/changelog
new file mode 100644
index 00000000..df00d3f6
--- /dev/null
+++ b/proxmox-parallel-handler/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-parallel-handler (1.0.0-1) unstable; urgency=medium
+
+ * initial version -- imported from proxmox-backup
+
+ -- Proxmox Support Team <support@proxmox.com> Thu, 26 Feb 2026 15:54:07 +0100
diff --git a/proxmox-parallel-handler/debian/control b/proxmox-parallel-handler/debian/control
new file mode 100644
index 00000000..ff356562
--- /dev/null
+++ b/proxmox-parallel-handler/debian/control
@@ -0,0 +1,36 @@
+Source: rust-proxmox-parallel-handler
+Section: rust
+Priority: optional
+Build-Depends: debhelper-compat (= 13),
+ dh-sequence-cargo
+Build-Depends-Arch: cargo:native <!nocheck>,
+ rustc:native <!nocheck>,
+ libstd-rust-dev <!nocheck>,
+ librust-anyhow-1+default-dev <!nocheck>,
+ librust-crossbeam-channel-0.5+default-dev <!nocheck>,
+ librust-thiserror-2+default-dev <!nocheck>
+Maintainer: Proxmox Support Team <support@proxmox.com>
+Standards-Version: 4.7.2
+Vcs-Git: git://git.proxmox.com/git/proxmox.git
+Vcs-Browser: https://git.proxmox.com/?p=proxmox.git
+Homepage: https://proxmox.com
+X-Cargo-Crate: proxmox-parallel-handler
+
+Package: librust-proxmox-parallel-handler-dev
+Architecture: any
+Multi-Arch: same
+Depends:
+ ${misc:Depends},
+ librust-anyhow-1+default-dev,
+ librust-crossbeam-channel-0.5+default-dev,
+ librust-thiserror-2+default-dev
+Provides:
+ librust-proxmox-parallel-handler+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0+default-dev (= ${binary:Version})
+Description: Thread pool which runs a closure in parallel - Rust source code
+ Source code for Debianized Rust crate "proxmox-parallel-handler"
diff --git a/proxmox-parallel-handler/debian/copyright b/proxmox-parallel-handler/debian/copyright
new file mode 100644
index 00000000..01138fa0
--- /dev/null
+++ b/proxmox-parallel-handler/debian/copyright
@@ -0,0 +1,18 @@
+Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
+
+Files:
+ *
+Copyright: 2026 Proxmox Server Solutions GmbH <support@proxmox.com>
+License: AGPL-3.0-or-later
+ This program is free software: you can redistribute it and/or modify it under
+ the terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or (at your option) any
+ later version.
+ .
+ This program is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+ details.
+ .
+ You should have received a copy of the GNU Affero General Public License along
+ with this program. If not, see <https://www.gnu.org/licenses/>.
diff --git a/proxmox-parallel-handler/debian/debcargo.toml b/proxmox-parallel-handler/debian/debcargo.toml
new file mode 100644
index 00000000..b7864cdb
--- /dev/null
+++ b/proxmox-parallel-handler/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs
new file mode 100644
index 00000000..75eab184
--- /dev/null
+++ b/proxmox-parallel-handler/src/lib.rs
@@ -0,0 +1,160 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, Sender};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{name} ({i})"))
+ .spawn(move || loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
--
2.47.3
next prev parent reply other threads:[~2026-03-12 13:53 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 04/26] parallel-handler: add documentation Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260312135229.420729-3-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox