From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server
Date: Thu, 12 Mar 2026 14:52:03 +0100 [thread overview]
Message-ID: <20260312135229.420729-3-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>
Code is left unchanged, improvements will follow in followup commits.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Cargo.toml | 2 +
proxmox-parallel-handler/Cargo.toml | 15 ++
proxmox-parallel-handler/debian/changelog | 5 +
proxmox-parallel-handler/debian/control | 36 ++++
proxmox-parallel-handler/debian/copyright | 18 ++
proxmox-parallel-handler/debian/debcargo.toml | 7 +
proxmox-parallel-handler/src/lib.rs | 160 ++++++++++++++++++
7 files changed, 243 insertions(+)
create mode 100644 proxmox-parallel-handler/Cargo.toml
create mode 100644 proxmox-parallel-handler/debian/changelog
create mode 100644 proxmox-parallel-handler/debian/control
create mode 100644 proxmox-parallel-handler/debian/copyright
create mode 100644 proxmox-parallel-handler/debian/debcargo.toml
create mode 100644 proxmox-parallel-handler/src/lib.rs
diff --git a/Cargo.toml b/Cargo.toml
index 1cb5f09e..97593a5d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,6 +33,7 @@ members = [
"proxmox-notify",
"proxmox-oci",
"proxmox-openid",
+ "proxmox-parallel-handler",
"proxmox-pgp",
"proxmox-product-config",
"proxmox-rate-limiter",
@@ -162,6 +163,7 @@ proxmox-lang = { version = "1.5", path = "proxmox-lang" }
proxmox-log = { version = "1.0.0", path = "proxmox-log" }
proxmox-login = { version = "1.0.0", path = "proxmox-login" }
proxmox-network-types = { version = "1.0.0", path = "proxmox-network-types" }
+proxmox-parallel-handler = { version = "1.0.0", path = "proxmox-parallel-handler" }
proxmox-pgp = { version = "1.0.0", path = "proxmox-pgp" }
proxmox-product-config = { version = "1.0.0", path = "proxmox-product-config" }
proxmox-config-digest = { version = "1.0.0", path = "proxmox-config-digest" }
diff --git a/proxmox-parallel-handler/Cargo.toml b/proxmox-parallel-handler/Cargo.toml
new file mode 100644
index 00000000..e55e7c63
--- /dev/null
+++ b/proxmox-parallel-handler/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "proxmox-parallel-handler"
+description = "thread pool which runs a closure in parallel"
+version = "1.0.0"
+
+authors.workspace = true
+edition.workspace = true
+exclude.workspace = true
+homepage.workspace = true
+license.workspace = true
+repository.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+crossbeam-channel.workspace = true
diff --git a/proxmox-parallel-handler/debian/changelog b/proxmox-parallel-handler/debian/changelog
new file mode 100644
index 00000000..df00d3f6
--- /dev/null
+++ b/proxmox-parallel-handler/debian/changelog
@@ -0,0 +1,5 @@
+rust-proxmox-parallel-handler (1.0.0-1) unstable; urgency=medium
+
+ * initial version -- imported from proxmox-backup
+
+ -- Proxmox Support Team <support@proxmox.com> Thu, 26 Feb 2026 15:54:07 +0100
diff --git a/proxmox-parallel-handler/debian/control b/proxmox-parallel-handler/debian/control
new file mode 100644
index 00000000..ff356562
--- /dev/null
+++ b/proxmox-parallel-handler/debian/control
@@ -0,0 +1,36 @@
+Source: rust-proxmox-parallel-handler
+Section: rust
+Priority: optional
+Build-Depends: debhelper-compat (= 13),
+ dh-sequence-cargo
+Build-Depends-Arch: cargo:native <!nocheck>,
+ rustc:native <!nocheck>,
+ libstd-rust-dev <!nocheck>,
+ librust-anyhow-1+default-dev <!nocheck>,
+ librust-crossbeam-channel-0.5+default-dev <!nocheck>,
+ librust-thiserror-2+default-dev <!nocheck>
+Maintainer: Proxmox Support Team <support@proxmox.com>
+Standards-Version: 4.7.2
+Vcs-Git: git://git.proxmox.com/git/proxmox.git
+Vcs-Browser: https://git.proxmox.com/?p=proxmox.git
+Homepage: https://proxmox.com
+X-Cargo-Crate: proxmox-parallel-handler
+
+Package: librust-proxmox-parallel-handler-dev
+Architecture: any
+Multi-Arch: same
+Depends:
+ ${misc:Depends},
+ librust-anyhow-1+default-dev,
+ librust-crossbeam-channel-0.5+default-dev,
+ librust-thiserror-2+default-dev
+Provides:
+ librust-proxmox-parallel-handler+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0+default-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0-dev (= ${binary:Version}),
+ librust-proxmox-parallel-handler-1.0.0+default-dev (= ${binary:Version})
+Description: Thread pool which runs a closure in parallel - Rust source code
+ Source code for Debianized Rust crate "proxmox-parallel-handler"
diff --git a/proxmox-parallel-handler/debian/copyright b/proxmox-parallel-handler/debian/copyright
new file mode 100644
index 00000000..01138fa0
--- /dev/null
+++ b/proxmox-parallel-handler/debian/copyright
@@ -0,0 +1,18 @@
+Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
+
+Files:
+ *
+Copyright: 2026 Proxmox Server Solutions GmbH <support@proxmox.com>
+License: AGPL-3.0-or-later
+ This program is free software: you can redistribute it and/or modify it under
+ the terms of the GNU Affero General Public License as published by the Free
+ Software Foundation, either version 3 of the License, or (at your option) any
+ later version.
+ .
+ This program is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+ details.
+ .
+ You should have received a copy of the GNU Affero General Public License along
+ with this program. If not, see <https://www.gnu.org/licenses/>.
diff --git a/proxmox-parallel-handler/debian/debcargo.toml b/proxmox-parallel-handler/debian/debcargo.toml
new file mode 100644
index 00000000..b7864cdb
--- /dev/null
+++ b/proxmox-parallel-handler/debian/debcargo.toml
@@ -0,0 +1,7 @@
+overlay = "."
+crate_src_path = ".."
+maintainer = "Proxmox Support Team <support@proxmox.com>"
+
+[source]
+vcs_git = "git://git.proxmox.com/git/proxmox.git"
+vcs_browser = "https://git.proxmox.com/?p=proxmox.git"
diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs
new file mode 100644
index 00000000..75eab184
--- /dev/null
+++ b/proxmox-parallel-handler/src/lib.rs
@@ -0,0 +1,160 @@
+//! A thread pool which run a closure in parallel.
+
+use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
+
+use anyhow::{bail, format_err, Error};
+use crossbeam_channel::{bounded, Sender};
+
+/// A handle to send data to the worker thread (implements clone)
+pub struct SendHandle<I> {
+ input: Sender<I>,
+ abort: Arc<Mutex<Option<String>>>,
+}
+
+/// Returns the first error happened, if any
+fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
+ }
+ Ok(())
+}
+
+impl<I: Send> SendHandle<I> {
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ check_abort(&self.abort)?;
+ match self.input.send(input) {
+ Ok(()) => Ok(()),
+ Err(_) => bail!("send failed - channel closed"),
+ }
+ }
+}
+
+/// A thread pool which run the supplied closure
+///
+/// The send command sends data to the worker threads. If one handler
+/// returns an error, we mark the channel as failed and it is no
+/// longer possible to send data.
+///
+/// When done, the 'complete()' method needs to be called to check for
+/// outstanding errors.
+pub struct ParallelHandler<I> {
+ handles: Vec<JoinHandle<()>>,
+ name: String,
+ input: Option<SendHandle<I>>,
+}
+
+impl<I> Clone for SendHandle<I> {
+ fn clone(&self) -> Self {
+ Self {
+ input: self.input.clone(),
+ abort: Arc::clone(&self.abort),
+ }
+ }
+}
+
+impl<I: Send + 'static> ParallelHandler<I> {
+ /// Create a new thread pool, each thread processing incoming data
+ /// with 'handler_fn'.
+ pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ where
+ F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
+ {
+ let mut handles = Vec::new();
+ let (input_tx, input_rx) = bounded::<I>(threads);
+
+ let abort = Arc::new(Mutex::new(None));
+
+ for i in 0..threads {
+ let input_rx = input_rx.clone();
+ let abort = Arc::clone(&abort);
+ let handler_fn = handler_fn.clone();
+
+ handles.push(
+ std::thread::Builder::new()
+ .name(format!("{name} ({i})"))
+ .spawn(move || loop {
+ let data = match input_rx.recv() {
+ Ok(data) => data,
+ Err(_) => return,
+ };
+ if let Err(err) = (handler_fn)(data) {
+ let mut guard = abort.lock().unwrap();
+ if guard.is_none() {
+ *guard = Some(err.to_string());
+ }
+ }
+ })
+ .unwrap(),
+ );
+ }
+ Self {
+ handles,
+ name: name.to_string(),
+ input: Some(SendHandle {
+ input: input_tx,
+ abort,
+ }),
+ }
+ }
+
+ /// Returns a cloneable channel to send data to the worker threads
+ pub fn channel(&self) -> SendHandle<I> {
+ self.input.as_ref().unwrap().clone()
+ }
+
+ /// Send data to the worker threads
+ pub fn send(&self, input: I) -> Result<(), Error> {
+ self.input.as_ref().unwrap().send(input)?;
+ Ok(())
+ }
+
+ /// Wait for worker threads to complete and check for errors
+ pub fn complete(mut self) -> Result<(), Error> {
+ let input = self.input.take().unwrap();
+ let abort = Arc::clone(&input.abort);
+ check_abort(&abort)?;
+ drop(input);
+
+ let msg_list = self.join_threads();
+
+ // an error might be encountered while waiting for the join
+ check_abort(&abort)?;
+
+ if msg_list.is_empty() {
+ return Ok(());
+ }
+ Err(format_err!("{}", msg_list.join("\n")))
+ }
+
+ fn join_threads(&mut self) -> Vec<String> {
+ let mut msg_list = Vec::new();
+
+ let mut i = 0;
+ while let Some(handle) = self.handles.pop() {
+ if let Err(panic) = handle.join() {
+ if let Some(panic_msg) = panic.downcast_ref::<&str>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
+ msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
+ } else {
+ msg_list.push(format!("thread {} ({i}) panicked", self.name));
+ }
+ }
+ i += 1;
+ }
+ msg_list
+ }
+}
+
+// Note: We make sure that all threads will be joined
+impl<I> Drop for ParallelHandler<I> {
+ fn drop(&mut self) {
+ drop(self.input.take());
+ while let Some(handle) = self.handles.pop() {
+ let _ = handle.join();
+ }
+ }
+}
--
2.47.3
next prev parent reply other threads:[~2026-03-12 13:53 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 04/26] parallel-handler: add documentation Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260312135229.420729-3-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.