From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox 04/26] parallel-handler: add documentation
Date: Thu, 12 Mar 2026 14:52:05 +0100 [thread overview]
Message-ID: <20260312135229.420729-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>
Add some module-level as well as function/type documentation.
---
proxmox-parallel-handler/src/lib.rs | 120 ++++++++++++++++++++++++----
1 file changed, 104 insertions(+), 16 deletions(-)
diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs
index 4c2ac118..38f6a48a 100644
--- a/proxmox-parallel-handler/src/lib.rs
+++ b/proxmox-parallel-handler/src/lib.rs
@@ -1,18 +1,56 @@
-//! A thread pool which run a closure in parallel.
+//! A thread pool that runs a closure in parallel across multiple worker threads.
+//!
+//! This crate provides [`ParallelHandler`], a simple thread pool that distributes work items of
+//! type `I` to a fixed number of worker threads, each executing the same handler closure. Work is
+//! submitted through a bounded [`crossbeam_channel`].
+//!
+//! If any worker's handler returns an error, the pool is marked as failed and subsequent
+//! [`send`](ParallelHandler::send) calls will return the first recorded error. After all items
+//! have been submitted, call [`complete`](ParallelHandler::complete) to join the worker threads
+//! and surface any errors (including thread panics).
+//!
+//! # Example
+//!
+//! ```
+//! use proxmox_parallel_handler::ParallelHandler;
+//!
+//! let pool = ParallelHandler::new("example", 4, |value: u64| {
+//! println!("processing {value}");
+//! Ok(())
+//! });
+//!
+//! for i in 0..100 {
+//! pool.send(i)?;
+//! }
+//!
+//! pool.complete()?;
+//! # Ok::<(), proxmox_parallel_handler::Error>(())
+//! ```
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use crossbeam_channel::{bounded, Sender};
+/// Errors returned by [`ParallelHandler`] and [`SendHandle`] operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
+ /// The internal channel has been closed.
+ ///
+ /// This typically means the worker threads have already shut down, either because
+ /// [`ParallelHandler::complete`] was called or the pool was dropped.
#[error("send failed - channel closed")]
ChannelClosed,
+ /// A worker thread's handler closure returned an error.
+ ///
+ /// Contains the formatted error message from the first handler that failed.
+ /// Once a handler fails, all subsequent [`send`](SendHandle::send) calls will
+ /// this error.
#[error("handler failed: {0}")]
HandlerFailed(String),
+ /// A worker thread panicked.
#[error("thread {name} panicked")]
ThreadPanicked {
/// The name of the thread.
@@ -22,13 +60,17 @@ pub enum Error {
},
}
-/// A handle to send data to the worker thread (implements clone)
+/// A cloneable handle for sending work items to a [`ParallelHandler`]'s worker threads.
+///
+/// Obtained via [`ParallelHandler::channel`]. Multiple clones of the same `SendHandle` share the
+/// underlying channel and abort state, so they can be used from different threads or tasks to
+/// submit work concurrently.
pub struct SendHandle<I> {
input: Sender<I>,
abort: Arc<Mutex<Option<String>>>,
}
-/// Returns the first error happened, if any
+/// Returns the first error which happened, if any.
fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
let guard = abort.lock().unwrap();
if let Some(err_msg) = &*guard {
@@ -38,21 +80,35 @@ fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
}
impl<I: Send> SendHandle<I> {
- /// Send data to the worker threads
+ /// Send a work item to the worker threads.
+ ///
+ /// The item is placed into the bounded channel and will be picked up by the next idle
+ /// worker. If all workers are busy, this call blocks until a worker becomes available.
+ ///
+ /// # Errors
+ ///
+ /// - [`Error::HandlerFailed`] if any worker has already returned an error
+ /// - [`Error::ChannelClosed`] if the channel has been closed (e.g. the pool was dropped).
pub fn send(&self, input: I) -> Result<(), Error> {
check_abort(&self.abort)?;
self.input.send(input).map_err(|_| Error::ChannelClosed)
}
}
-/// A thread pool which run the supplied closure
+/// A thread pool that runs the supplied closure on each work item in parallel.
///
-/// The send command sends data to the worker threads. If one handler
-/// returns an error, we mark the channel as failed and it is no
-/// longer possible to send data.
+/// `ParallelHandler` spawns a fixed number of worker threads at construction time. Each thread
+/// receives work items of type `I` through a shared bounded channel and processes them with a
+/// cloned copy of the handler closure.
///
-/// When done, the 'complete()' method needs to be called to check for
-/// outstanding errors.
+/// # Error handling
+///
+/// If any handler invocation returns an error, the pool records the first error message and
+/// enters a failed state. Subsequent [`send`](Self::send) calls will immediately return
+/// [`Error::HandlerFailed`] rather than enqueueing more work.
+///
+/// If the `ParallelHandler` is dropped without calling `complete`, the [`Drop`] implementation
+/// still joins all threads, but any errors are silently discarded.
pub struct ParallelHandler<I> {
handles: Vec<JoinHandle<()>>,
input: Option<SendHandle<I>>,
@@ -68,8 +124,14 @@ impl<I> Clone for SendHandle<I> {
}
impl<I: Send + 'static> ParallelHandler<I> {
- /// Create a new thread pool, each thread processing incoming data
- /// with 'handler_fn'.
+ /// Create a new thread pool with `threads` workers, each processing incoming data with
+ /// `handler_fn`.
+ ///
+ /// # Parameters
+ ///
+ /// - `name` - A human-readable name used in thread names and error messages.
+ /// - `threads` - The number of worker threads to spawn.
+ /// - `handler_fn` - The closure invoked for every work item.
pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
where
F: Fn(I) -> Result<(), anyhow::Error> + Send + Clone + 'static,
@@ -99,6 +161,8 @@ impl<I: Send + 'static> ParallelHandler<I> {
}
}
})
+ // unwrap is fine, `spawn` only panics if a thread name with null bytes as
+ // set
.unwrap(),
);
}
@@ -111,18 +175,39 @@ impl<I: Send + 'static> ParallelHandler<I> {
}
}
- /// Returns a cloneable channel to send data to the worker threads
+ /// Returns a cloneable [`SendHandle`] that can be used to send work items to the worker
+ /// threads.
+ ///
+ /// This is useful when you need to send items from multiple threads or tasks concurrently.
+ /// Each clone of the returned handle shares the same underlying channel.
pub fn channel(&self) -> SendHandle<I> {
+ // unwrap: fine as long as Self::complete has not been called yet. Since
+ // Self::complete takes self, this cannot happen for any of our callers.
self.input.as_ref().unwrap().clone()
}
- /// Send data to the worker threads
+ /// Send a work item to the worker threads.
+ ///
+ /// Convenience wrapper around the internal [`SendHandle::send`]. Blocks if the bounded
+ /// channel is full (i.e. all workers are busy).
+ ///
+ /// # Errors
+ ///
+ /// - [`Error::HandlerFailed`] if any worker has already returned an error
+ /// - [`Error::ChannelClosed`] if the channel has been closed.
pub fn send(&self, input: I) -> Result<(), Error> {
+ // unwrap: fine as long as Self::complete has not been called yet. Since
+ // Self::complete takes self, this cannot happen for any of our callers.
self.input.as_ref().unwrap().send(input)?;
Ok(())
}
- /// Wait for worker threads to complete and check for errors
+ /// Close the channel, wait for all worker threads to finish, and check for errors.
+ ///
+ /// # Errors
+ ///
+ /// - [`Error::HandlerFailed`] - if any handler returned an error.
+ /// - [`Error::ThreadPanicked`] - if a worker thread panicked.
pub fn complete(mut self) -> Result<(), Error> {
let input = self.input.take().unwrap();
let abort = Arc::clone(&input.abort);
@@ -172,7 +257,10 @@ impl<I: Send + 'static> ParallelHandler<I> {
}
}
-// Note: We make sure that all threads will be joined
+/// Dropping a `ParallelHandler` closes the channel and joins all worker threads.
+///
+/// Any errors that occurred in handler closures or thread panics are silently discarded.
+/// Prefer calling [`ParallelHandler::complete`] explicitly if you need to observe errors.
impl<I> Drop for ParallelHandler<I> {
fn drop(&mut self) {
drop(self.input.take());
--
2.47.3
next prev parent reply other threads:[~2026-03-12 13:53 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260312135229.420729-5-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.