public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox 04/26] parallel-handler: add documentation
Date: Thu, 12 Mar 2026 14:52:05 +0100	[thread overview]
Message-ID: <20260312135229.420729-5-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>

Add some module-level as well as function/type documentation.
---
 proxmox-parallel-handler/src/lib.rs | 120 ++++++++++++++++++++++++----
 1 file changed, 104 insertions(+), 16 deletions(-)

diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs
index 4c2ac118..38f6a48a 100644
--- a/proxmox-parallel-handler/src/lib.rs
+++ b/proxmox-parallel-handler/src/lib.rs
@@ -1,18 +1,56 @@
-//! A thread pool which run a closure in parallel.
+//! A thread pool that runs a closure in parallel across multiple worker threads.
+//!
+//! This crate provides [`ParallelHandler`], a simple thread pool that distributes work items of
+//! type `I` to a fixed number of worker threads, each executing the same handler closure. Work is
+//! submitted through a bounded [`crossbeam_channel`].
+//!
+//! If any worker's handler returns an error, the pool is marked as failed and subsequent
+//! [`send`](ParallelHandler::send) calls will return the first recorded error. After all items
+//! have been submitted, call [`complete`](ParallelHandler::complete) to join the worker threads
+//! and surface any errors (including thread panics).
+//!
+//! # Example
+//!
+//! ```
+//! use proxmox_parallel_handler::ParallelHandler;
+//!
+//! let pool = ParallelHandler::new("example", 4, |value: u64| {
+//!     println!("processing {value}");
+//!     Ok(())
+//! });
+//!
+//! for i in 0..100 {
+//!     pool.send(i)?;
+//! }
+//!
+//! pool.complete()?;
+//! # Ok::<(), proxmox_parallel_handler::Error>(())
+//! ```
 
 use std::sync::{Arc, Mutex};
 use std::thread::JoinHandle;
 
 use crossbeam_channel::{bounded, Sender};
 
+/// Errors returned by [`ParallelHandler`] and [`SendHandle`] operations.
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
+    /// The internal channel has been closed.
+    ///
+    /// This typically means the worker threads have already shut down, either because
+    /// [`ParallelHandler::complete`] was called or the pool was dropped.
     #[error("send failed - channel closed")]
     ChannelClosed,
 
+    /// A worker thread's handler closure returned an error.
+    ///
+    /// Contains the formatted error message from the first handler that failed.
+    /// Once a handler fails, all subsequent [`send`](SendHandle::send) calls will
+    /// this error.
     #[error("handler failed: {0}")]
     HandlerFailed(String),
 
+    /// A worker thread panicked.
     #[error("thread {name} panicked")]
     ThreadPanicked {
         /// The name of the thread.
@@ -22,13 +60,17 @@ pub enum Error {
     },
 }
 
-/// A handle to send data to the worker thread (implements clone)
+/// A cloneable handle for sending work items to a [`ParallelHandler`]'s worker threads.
+///
+/// Obtained via [`ParallelHandler::channel`]. Multiple clones of the same `SendHandle` share the
+/// underlying channel and abort state, so they can be used from different threads or tasks to
+/// submit work concurrently.
 pub struct SendHandle<I> {
     input: Sender<I>,
     abort: Arc<Mutex<Option<String>>>,
 }
 
-/// Returns the first error happened, if any
+/// Returns the first error which happened, if any.
 fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
     let guard = abort.lock().unwrap();
     if let Some(err_msg) = &*guard {
@@ -38,21 +80,35 @@ fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
 }
 
 impl<I: Send> SendHandle<I> {
-    /// Send data to the worker threads
+    /// Send a work item to the worker threads.
+    ///
+    /// The item is placed into the bounded channel and will be picked up by the next idle
+    /// worker. If all workers are busy, this call blocks until a worker becomes available.
+    ///
+    /// # Errors
+    ///
+    ///  - [`Error::HandlerFailed`] if any worker has already returned an error
+    ///  - [`Error::ChannelClosed`] if the channel has been closed (e.g. the pool was dropped).
     pub fn send(&self, input: I) -> Result<(), Error> {
         check_abort(&self.abort)?;
         self.input.send(input).map_err(|_| Error::ChannelClosed)
     }
 }
 
-/// A thread pool which run the supplied closure
+/// A thread pool that runs the supplied closure on each work item in parallel.
 ///
-/// The send command sends data to the worker threads. If one handler
-/// returns an error, we mark the channel as failed and it is no
-/// longer possible to send data.
+/// `ParallelHandler` spawns a fixed number of worker threads at construction time. Each thread
+/// receives work items of type `I` through a shared bounded channel and processes them with a
+/// cloned copy of the handler closure.
 ///
-/// When done, the 'complete()' method needs to be called to check for
-/// outstanding errors.
+/// # Error handling
+///
+/// If any handler invocation returns an error, the pool records the first error message and
+/// enters a failed state. Subsequent [`send`](Self::send) calls will immediately return
+/// [`Error::HandlerFailed`] rather than enqueueing more work.
+///
+/// If the `ParallelHandler` is dropped without calling `complete`, the [`Drop`] implementation
+/// still joins all threads, but any errors are silently discarded.
 pub struct ParallelHandler<I> {
     handles: Vec<JoinHandle<()>>,
     input: Option<SendHandle<I>>,
@@ -68,8 +124,14 @@ impl<I> Clone for SendHandle<I> {
 }
 
 impl<I: Send + 'static> ParallelHandler<I> {
-    /// Create a new thread pool, each thread processing incoming data
-    /// with 'handler_fn'.
+    /// Create a new thread pool with `threads` workers, each processing incoming data with
+    /// `handler_fn`.
+    ///
+    /// # Parameters
+    ///
+    /// - `name` - A human-readable name used in thread names and error messages.
+    /// - `threads` - The number of worker threads to spawn.
+    /// - `handler_fn` - The closure invoked for every work item.
     pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
     where
         F: Fn(I) -> Result<(), anyhow::Error> + Send + Clone + 'static,
@@ -99,6 +161,8 @@ impl<I: Send + 'static> ParallelHandler<I> {
                             }
                         }
                     })
+                    // unwrap is fine, `spawn` only panics if a thread name with null bytes as
+                    // set
                     .unwrap(),
             );
         }
@@ -111,18 +175,39 @@ impl<I: Send + 'static> ParallelHandler<I> {
         }
     }
 
-    /// Returns a cloneable channel to send data to the worker threads
+    /// Returns a cloneable [`SendHandle`] that can be used to send work items to the worker
+    /// threads.
+    ///
+    /// This is useful when you need to send items from multiple threads or tasks concurrently.
+    /// Each clone of the returned handle shares the same underlying channel.
     pub fn channel(&self) -> SendHandle<I> {
+        // unwrap: fine as long as Self::complete has not been called yet. Since
+        // Self::complete takes self, this cannot happen for any of our callers.
         self.input.as_ref().unwrap().clone()
     }
 
-    /// Send data to the worker threads
+    /// Send a work item to the worker threads.
+    ///
+    /// Convenience wrapper around the internal [`SendHandle::send`]. Blocks if the bounded
+    /// channel is full (i.e. all workers are busy).
+    ///
+    /// # Errors
+    ///
+    ///  - [`Error::HandlerFailed`] if any worker has already returned an error
+    ///  - [`Error::ChannelClosed`] if the channel has been closed.
     pub fn send(&self, input: I) -> Result<(), Error> {
+        // unwrap: fine as long as Self::complete has not been called yet. Since
+        // Self::complete takes self, this cannot happen for any of our callers.
         self.input.as_ref().unwrap().send(input)?;
         Ok(())
     }
 
-    /// Wait for worker threads to complete and check for errors
+    /// Close the channel, wait for all worker threads to finish, and check for errors.
+    ///
+    /// # Errors
+    ///
+    /// - [`Error::HandlerFailed`] - if any handler returned an error.
+    /// - [`Error::ThreadPanicked`] - if a worker thread panicked.
     pub fn complete(mut self) -> Result<(), Error> {
         let input = self.input.take().unwrap();
         let abort = Arc::clone(&input.abort);
@@ -172,7 +257,10 @@ impl<I: Send + 'static> ParallelHandler<I> {
     }
 }
 
-// Note: We make sure that all threads will be joined
+/// Dropping a `ParallelHandler` closes the channel and joins all worker threads.
+///
+/// Any errors that occurred in handler closures or thread panics are silently discarded.
+/// Prefer calling [`ParallelHandler::complete`] explicitly if you need to observe errors.
 impl<I> Drop for ParallelHandler<I> {
     fn drop(&mut self) {
         drop(self.input.take());
-- 
2.47.3





  parent reply	other threads:[~2026-03-12 13:53 UTC|newest]

Thread overview: 31+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260312135229.420729-5-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal