From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D2DE81FF18C for ; Thu, 12 Mar 2026 14:53:25 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2F3D317689; Thu, 12 Mar 2026 14:53:15 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH proxmox 03/26] parallel-handler: introduce custom error type Date: Thu, 12 Mar 2026 14:52:04 +0100 Message-ID: <20260312135229.420729-4-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com> References: <20260312135229.420729-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773323522045 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: PL5JBXGKKQ22EURVWD6CXKULUJIQ6EMD X-Message-ID-Hash: PL5JBXGKKQ22EURVWD6CXKULUJIQ6EMD X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Derive a custom error type using `thiserror`. For the handler functions, we still use anyhow::Error, since that would involve bigger changes in the callers. Signed-off-by: Lukas Wagner --- proxmox-parallel-handler/Cargo.toml | 1 + proxmox-parallel-handler/src/lib.rs | 67 +++++++++++++++++++---------- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/proxmox-parallel-handler/Cargo.toml b/proxmox-parallel-handler/Cargo.toml index e55e7c63..5fe67889 100644 --- a/proxmox-parallel-handler/Cargo.toml +++ b/proxmox-parallel-handler/Cargo.toml @@ -13,3 +13,4 @@ repository.workspace = true [dependencies] anyhow.workspace = true crossbeam-channel.workspace = true +thiserror.workspace = true diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs index 75eab184..4c2ac118 100644 --- a/proxmox-parallel-handler/src/lib.rs +++ b/proxmox-parallel-handler/src/lib.rs @@ -3,9 +3,25 @@ use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; -use anyhow::{bail, format_err, Error}; use crossbeam_channel::{bounded, Sender}; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("send failed - channel closed")] + ChannelClosed, + + #[error("handler failed: {0}")] + HandlerFailed(String), + + #[error("thread {name} panicked")] + ThreadPanicked { + /// The name of the thread. + name: String, + /// The panic message extracted from the panic payload. + message: Option, + }, +} + /// A handle to send data to the worker thread (implements clone) pub struct SendHandle { input: Sender, @@ -16,7 +32,7 @@ pub struct SendHandle { fn check_abort(abort: &Mutex>) -> Result<(), Error> { let guard = abort.lock().unwrap(); if let Some(err_msg) = &*guard { - return Err(format_err!("{}", err_msg)); + return Err(Error::HandlerFailed(err_msg.clone())); } Ok(()) } @@ -25,10 +41,7 @@ impl SendHandle { /// Send data to the worker threads pub fn send(&self, input: I) -> Result<(), Error> { check_abort(&self.abort)?; - match self.input.send(input) { - Ok(()) => Ok(()), - Err(_) => bail!("send failed - channel closed"), - } + self.input.send(input).map_err(|_| Error::ChannelClosed) } } @@ -42,7 +55,6 @@ impl SendHandle { /// outstanding errors. pub struct ParallelHandler { handles: Vec>, - name: String, input: Option>, } @@ -60,7 +72,7 @@ impl ParallelHandler { /// with 'handler_fn'. pub fn new(name: &str, threads: usize, handler_fn: F) -> Self where - F: Fn(I) -> Result<(), Error> + Send + Clone + 'static, + F: Fn(I) -> Result<(), anyhow::Error> + Send + Clone + 'static, { let mut handles = Vec::new(); let (input_tx, input_rx) = bounded::(threads); @@ -83,7 +95,7 @@ impl ParallelHandler { if let Err(err) = (handler_fn)(data) { let mut guard = abort.lock().unwrap(); if guard.is_none() { - *guard = Some(err.to_string()); + *guard = Some(format!("{err:#}")); } } }) @@ -92,7 +104,6 @@ impl ParallelHandler { } Self { handles, - name: name.to_string(), input: Some(SendHandle { input: input_tx, abort, @@ -118,32 +129,44 @@ impl ParallelHandler { check_abort(&abort)?; drop(input); - let msg_list = self.join_threads(); + let mut msg_list = self.join_threads(); // an error might be encountered while waiting for the join check_abort(&abort)?; - if msg_list.is_empty() { - return Ok(()); + if let Some(e) = msg_list.pop() { + // Any error here is due to a thread panicking - let's just report that + // last panic that occurred. + Err(e) + } else { + Ok(()) } - Err(format_err!("{}", msg_list.join("\n"))) } - fn join_threads(&mut self) -> Vec { + fn join_threads(&mut self) -> Vec { let mut msg_list = Vec::new(); - let mut i = 0; while let Some(handle) = self.handles.pop() { + let thread_name = handle.thread().name().unwrap_or("").to_string(); + if let Err(panic) = handle.join() { - if let Some(panic_msg) = panic.downcast_ref::<&str>() { - msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); - } else if let Some(panic_msg) = panic.downcast_ref::() { - msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); + if let Some(message) = panic.downcast_ref::<&str>() { + msg_list.push(Error::ThreadPanicked { + name: thread_name, + message: Some(message.to_string()), + }); + } else if let Some(message) = panic.downcast_ref::() { + msg_list.push(Error::ThreadPanicked { + name: thread_name, + message: Some(message.to_string()), + }); } else { - msg_list.push(format!("thread {} ({i}) panicked", self.name)); + msg_list.push(Error::ThreadPanicked { + name: thread_name, + message: None, + }); } } - i += 1; } msg_list } -- 2.47.3