From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D007A1FF13F for ; Thu, 12 Mar 2026 14:53:27 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4302B17808; Thu, 12 Mar 2026 14:53:16 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH proxmox 04/26] parallel-handler: add documentation Date: Thu, 12 Mar 2026 14:52:05 +0100 Message-ID: <20260312135229.420729-5-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com> References: <20260312135229.420729-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773323522177 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: HSWQNZ3DYLZX4UE56NN7E7CU274WFE4K X-Message-ID-Hash: HSWQNZ3DYLZX4UE56NN7E7CU274WFE4K X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Add some module-level as well as function/type documentation. --- proxmox-parallel-handler/src/lib.rs | 120 ++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 16 deletions(-) diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs index 4c2ac118..38f6a48a 100644 --- a/proxmox-parallel-handler/src/lib.rs +++ b/proxmox-parallel-handler/src/lib.rs @@ -1,18 +1,56 @@ -//! A thread pool which run a closure in parallel. +//! A thread pool that runs a closure in parallel across multiple worker threads. +//! +//! This crate provides [`ParallelHandler`], a simple thread pool that distributes work items of +//! type `I` to a fixed number of worker threads, each executing the same handler closure. Work is +//! submitted through a bounded [`crossbeam_channel`]. +//! +//! If any worker's handler returns an error, the pool is marked as failed and subsequent +//! [`send`](ParallelHandler::send) calls will return the first recorded error. After all items +//! have been submitted, call [`complete`](ParallelHandler::complete) to join the worker threads +//! and surface any errors (including thread panics). +//! +//! # Example +//! +//! ``` +//! use proxmox_parallel_handler::ParallelHandler; +//! +//! let pool = ParallelHandler::new("example", 4, |value: u64| { +//! println!("processing {value}"); +//! Ok(()) +//! }); +//! +//! for i in 0..100 { +//! pool.send(i)?; +//! } +//! +//! pool.complete()?; +//! # Ok::<(), proxmox_parallel_handler::Error>(()) +//! ``` use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use crossbeam_channel::{bounded, Sender}; +/// Errors returned by [`ParallelHandler`] and [`SendHandle`] operations. #[derive(Debug, thiserror::Error)] pub enum Error { + /// The internal channel has been closed. + /// + /// This typically means the worker threads have already shut down, either because + /// [`ParallelHandler::complete`] was called or the pool was dropped. #[error("send failed - channel closed")] ChannelClosed, + /// A worker thread's handler closure returned an error. + /// + /// Contains the formatted error message from the first handler that failed. + /// Once a handler fails, all subsequent [`send`](SendHandle::send) calls will + /// this error. #[error("handler failed: {0}")] HandlerFailed(String), + /// A worker thread panicked. #[error("thread {name} panicked")] ThreadPanicked { /// The name of the thread. @@ -22,13 +60,17 @@ pub enum Error { }, } -/// A handle to send data to the worker thread (implements clone) +/// A cloneable handle for sending work items to a [`ParallelHandler`]'s worker threads. +/// +/// Obtained via [`ParallelHandler::channel`]. Multiple clones of the same `SendHandle` share the +/// underlying channel and abort state, so they can be used from different threads or tasks to +/// submit work concurrently. pub struct SendHandle { input: Sender, abort: Arc>>, } -/// Returns the first error happened, if any +/// Returns the first error which happened, if any. fn check_abort(abort: &Mutex>) -> Result<(), Error> { let guard = abort.lock().unwrap(); if let Some(err_msg) = &*guard { @@ -38,21 +80,35 @@ fn check_abort(abort: &Mutex>) -> Result<(), Error> { } impl SendHandle { - /// Send data to the worker threads + /// Send a work item to the worker threads. + /// + /// The item is placed into the bounded channel and will be picked up by the next idle + /// worker. If all workers are busy, this call blocks until a worker becomes available. + /// + /// # Errors + /// + /// - [`Error::HandlerFailed`] if any worker has already returned an error + /// - [`Error::ChannelClosed`] if the channel has been closed (e.g. the pool was dropped). pub fn send(&self, input: I) -> Result<(), Error> { check_abort(&self.abort)?; self.input.send(input).map_err(|_| Error::ChannelClosed) } } -/// A thread pool which run the supplied closure +/// A thread pool that runs the supplied closure on each work item in parallel. /// -/// The send command sends data to the worker threads. If one handler -/// returns an error, we mark the channel as failed and it is no -/// longer possible to send data. +/// `ParallelHandler` spawns a fixed number of worker threads at construction time. Each thread +/// receives work items of type `I` through a shared bounded channel and processes them with a +/// cloned copy of the handler closure. /// -/// When done, the 'complete()' method needs to be called to check for -/// outstanding errors. +/// # Error handling +/// +/// If any handler invocation returns an error, the pool records the first error message and +/// enters a failed state. Subsequent [`send`](Self::send) calls will immediately return +/// [`Error::HandlerFailed`] rather than enqueueing more work. +/// +/// If the `ParallelHandler` is dropped without calling `complete`, the [`Drop`] implementation +/// still joins all threads, but any errors are silently discarded. pub struct ParallelHandler { handles: Vec>, input: Option>, @@ -68,8 +124,14 @@ impl Clone for SendHandle { } impl ParallelHandler { - /// Create a new thread pool, each thread processing incoming data - /// with 'handler_fn'. + /// Create a new thread pool with `threads` workers, each processing incoming data with + /// `handler_fn`. + /// + /// # Parameters + /// + /// - `name` - A human-readable name used in thread names and error messages. + /// - `threads` - The number of worker threads to spawn. + /// - `handler_fn` - The closure invoked for every work item. pub fn new(name: &str, threads: usize, handler_fn: F) -> Self where F: Fn(I) -> Result<(), anyhow::Error> + Send + Clone + 'static, @@ -99,6 +161,8 @@ impl ParallelHandler { } } }) + // unwrap is fine, `spawn` only panics if a thread name with null bytes as + // set .unwrap(), ); } @@ -111,18 +175,39 @@ impl ParallelHandler { } } - /// Returns a cloneable channel to send data to the worker threads + /// Returns a cloneable [`SendHandle`] that can be used to send work items to the worker + /// threads. + /// + /// This is useful when you need to send items from multiple threads or tasks concurrently. + /// Each clone of the returned handle shares the same underlying channel. pub fn channel(&self) -> SendHandle { + // unwrap: fine as long as Self::complete has not been called yet. Since + // Self::complete takes self, this cannot happen for any of our callers. self.input.as_ref().unwrap().clone() } - /// Send data to the worker threads + /// Send a work item to the worker threads. + /// + /// Convenience wrapper around the internal [`SendHandle::send`]. Blocks if the bounded + /// channel is full (i.e. all workers are busy). + /// + /// # Errors + /// + /// - [`Error::HandlerFailed`] if any worker has already returned an error + /// - [`Error::ChannelClosed`] if the channel has been closed. pub fn send(&self, input: I) -> Result<(), Error> { + // unwrap: fine as long as Self::complete has not been called yet. Since + // Self::complete takes self, this cannot happen for any of our callers. self.input.as_ref().unwrap().send(input)?; Ok(()) } - /// Wait for worker threads to complete and check for errors + /// Close the channel, wait for all worker threads to finish, and check for errors. + /// + /// # Errors + /// + /// - [`Error::HandlerFailed`] - if any handler returned an error. + /// - [`Error::ThreadPanicked`] - if a worker thread panicked. pub fn complete(mut self) -> Result<(), Error> { let input = self.input.take().unwrap(); let abort = Arc::clone(&input.abort); @@ -172,7 +257,10 @@ impl ParallelHandler { } } -// Note: We make sure that all threads will be joined +/// Dropping a `ParallelHandler` closes the channel and joins all worker threads. +/// +/// Any errors that occurred in handler closures or thread panics are silently discarded. +/// Prefer calling [`ParallelHandler::complete`] explicitly if you need to observe errors. impl Drop for ParallelHandler { fn drop(&mut self) { drop(self.input.take()); -- 2.47.3