From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D0F641FF13F for ; Thu, 12 Mar 2026 14:53:34 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1F7FA178F9; Thu, 12 Mar 2026 14:53:27 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server Date: Thu, 12 Mar 2026 14:52:03 +0100 Message-ID: <20260312135229.420729-3-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com> References: <20260312135229.420729-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773323521902 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.046 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 2NDY4TTEVHMCZVMXMJDYKG6HFWXCNU66 X-Message-ID-Hash: 2NDY4TTEVHMCZVMXMJDYKG6HFWXCNU66 X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Code is left unchanged, improvements will follow in followup commits. Signed-off-by: Lukas Wagner --- Cargo.toml | 2 + proxmox-parallel-handler/Cargo.toml | 15 ++ proxmox-parallel-handler/debian/changelog | 5 + proxmox-parallel-handler/debian/control | 36 ++++ proxmox-parallel-handler/debian/copyright | 18 ++ proxmox-parallel-handler/debian/debcargo.toml | 7 + proxmox-parallel-handler/src/lib.rs | 160 ++++++++++++++++++ 7 files changed, 243 insertions(+) create mode 100644 proxmox-parallel-handler/Cargo.toml create mode 100644 proxmox-parallel-handler/debian/changelog create mode 100644 proxmox-parallel-handler/debian/control create mode 100644 proxmox-parallel-handler/debian/copyright create mode 100644 proxmox-parallel-handler/debian/debcargo.toml create mode 100644 proxmox-parallel-handler/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1cb5f09e..97593a5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "proxmox-notify", "proxmox-oci", "proxmox-openid", + "proxmox-parallel-handler", "proxmox-pgp", "proxmox-product-config", "proxmox-rate-limiter", @@ -162,6 +163,7 @@ proxmox-lang = { version = "1.5", path = "proxmox-lang" } proxmox-log = { version = "1.0.0", path = "proxmox-log" } proxmox-login = { version = "1.0.0", path = "proxmox-login" } proxmox-network-types = { version = "1.0.0", path = "proxmox-network-types" } +proxmox-parallel-handler = { version = "1.0.0", path = "proxmox-parallel-handler" } proxmox-pgp = { version = "1.0.0", path = "proxmox-pgp" } proxmox-product-config = { version = "1.0.0", path = "proxmox-product-config" } proxmox-config-digest = { version = "1.0.0", path = "proxmox-config-digest" } diff --git a/proxmox-parallel-handler/Cargo.toml b/proxmox-parallel-handler/Cargo.toml new file mode 100644 index 00000000..e55e7c63 --- /dev/null +++ b/proxmox-parallel-handler/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "proxmox-parallel-handler" +description = "thread pool which runs a closure in parallel" +version = "1.0.0" + +authors.workspace = true +edition.workspace = true +exclude.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +crossbeam-channel.workspace = true diff --git a/proxmox-parallel-handler/debian/changelog b/proxmox-parallel-handler/debian/changelog new file mode 100644 index 00000000..df00d3f6 --- /dev/null +++ b/proxmox-parallel-handler/debian/changelog @@ -0,0 +1,5 @@ +rust-proxmox-parallel-handler (1.0.0-1) unstable; urgency=medium + + * initial version -- imported from proxmox-backup + + -- Proxmox Support Team Thu, 26 Feb 2026 15:54:07 +0100 diff --git a/proxmox-parallel-handler/debian/control b/proxmox-parallel-handler/debian/control new file mode 100644 index 00000000..ff356562 --- /dev/null +++ b/proxmox-parallel-handler/debian/control @@ -0,0 +1,36 @@ +Source: rust-proxmox-parallel-handler +Section: rust +Priority: optional +Build-Depends: debhelper-compat (= 13), + dh-sequence-cargo +Build-Depends-Arch: cargo:native , + rustc:native , + libstd-rust-dev , + librust-anyhow-1+default-dev , + librust-crossbeam-channel-0.5+default-dev , + librust-thiserror-2+default-dev +Maintainer: Proxmox Support Team +Standards-Version: 4.7.2 +Vcs-Git: git://git.proxmox.com/git/proxmox.git +Vcs-Browser: https://git.proxmox.com/?p=proxmox.git +Homepage: https://proxmox.com +X-Cargo-Crate: proxmox-parallel-handler + +Package: librust-proxmox-parallel-handler-dev +Architecture: any +Multi-Arch: same +Depends: + ${misc:Depends}, + librust-anyhow-1+default-dev, + librust-crossbeam-channel-0.5+default-dev, + librust-thiserror-2+default-dev +Provides: + librust-proxmox-parallel-handler+default-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1+default-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1.0-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1.0+default-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1.0.0-dev (= ${binary:Version}), + librust-proxmox-parallel-handler-1.0.0+default-dev (= ${binary:Version}) +Description: Thread pool which runs a closure in parallel - Rust source code + Source code for Debianized Rust crate "proxmox-parallel-handler" diff --git a/proxmox-parallel-handler/debian/copyright b/proxmox-parallel-handler/debian/copyright new file mode 100644 index 00000000..01138fa0 --- /dev/null +++ b/proxmox-parallel-handler/debian/copyright @@ -0,0 +1,18 @@ +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ + +Files: + * +Copyright: 2026 Proxmox Server Solutions GmbH +License: AGPL-3.0-or-later + This program is free software: you can redistribute it and/or modify it under + the terms of the GNU Affero General Public License as published by the Free + Software Foundation, either version 3 of the License, or (at your option) any + later version. + . + This program is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more + details. + . + You should have received a copy of the GNU Affero General Public License along + with this program. If not, see . diff --git a/proxmox-parallel-handler/debian/debcargo.toml b/proxmox-parallel-handler/debian/debcargo.toml new file mode 100644 index 00000000..b7864cdb --- /dev/null +++ b/proxmox-parallel-handler/debian/debcargo.toml @@ -0,0 +1,7 @@ +overlay = "." +crate_src_path = ".." +maintainer = "Proxmox Support Team " + +[source] +vcs_git = "git://git.proxmox.com/git/proxmox.git" +vcs_browser = "https://git.proxmox.com/?p=proxmox.git" diff --git a/proxmox-parallel-handler/src/lib.rs b/proxmox-parallel-handler/src/lib.rs new file mode 100644 index 00000000..75eab184 --- /dev/null +++ b/proxmox-parallel-handler/src/lib.rs @@ -0,0 +1,160 @@ +//! A thread pool which run a closure in parallel. + +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +use anyhow::{bail, format_err, Error}; +use crossbeam_channel::{bounded, Sender}; + +/// A handle to send data to the worker thread (implements clone) +pub struct SendHandle { + input: Sender, + abort: Arc>>, +} + +/// Returns the first error happened, if any +fn check_abort(abort: &Mutex>) -> Result<(), Error> { + let guard = abort.lock().unwrap(); + if let Some(err_msg) = &*guard { + return Err(format_err!("{}", err_msg)); + } + Ok(()) +} + +impl SendHandle { + /// Send data to the worker threads + pub fn send(&self, input: I) -> Result<(), Error> { + check_abort(&self.abort)?; + match self.input.send(input) { + Ok(()) => Ok(()), + Err(_) => bail!("send failed - channel closed"), + } + } +} + +/// A thread pool which run the supplied closure +/// +/// The send command sends data to the worker threads. If one handler +/// returns an error, we mark the channel as failed and it is no +/// longer possible to send data. +/// +/// When done, the 'complete()' method needs to be called to check for +/// outstanding errors. +pub struct ParallelHandler { + handles: Vec>, + name: String, + input: Option>, +} + +impl Clone for SendHandle { + fn clone(&self) -> Self { + Self { + input: self.input.clone(), + abort: Arc::clone(&self.abort), + } + } +} + +impl ParallelHandler { + /// Create a new thread pool, each thread processing incoming data + /// with 'handler_fn'. + pub fn new(name: &str, threads: usize, handler_fn: F) -> Self + where + F: Fn(I) -> Result<(), Error> + Send + Clone + 'static, + { + let mut handles = Vec::new(); + let (input_tx, input_rx) = bounded::(threads); + + let abort = Arc::new(Mutex::new(None)); + + for i in 0..threads { + let input_rx = input_rx.clone(); + let abort = Arc::clone(&abort); + let handler_fn = handler_fn.clone(); + + handles.push( + std::thread::Builder::new() + .name(format!("{name} ({i})")) + .spawn(move || loop { + let data = match input_rx.recv() { + Ok(data) => data, + Err(_) => return, + }; + if let Err(err) = (handler_fn)(data) { + let mut guard = abort.lock().unwrap(); + if guard.is_none() { + *guard = Some(err.to_string()); + } + } + }) + .unwrap(), + ); + } + Self { + handles, + name: name.to_string(), + input: Some(SendHandle { + input: input_tx, + abort, + }), + } + } + + /// Returns a cloneable channel to send data to the worker threads + pub fn channel(&self) -> SendHandle { + self.input.as_ref().unwrap().clone() + } + + /// Send data to the worker threads + pub fn send(&self, input: I) -> Result<(), Error> { + self.input.as_ref().unwrap().send(input)?; + Ok(()) + } + + /// Wait for worker threads to complete and check for errors + pub fn complete(mut self) -> Result<(), Error> { + let input = self.input.take().unwrap(); + let abort = Arc::clone(&input.abort); + check_abort(&abort)?; + drop(input); + + let msg_list = self.join_threads(); + + // an error might be encountered while waiting for the join + check_abort(&abort)?; + + if msg_list.is_empty() { + return Ok(()); + } + Err(format_err!("{}", msg_list.join("\n"))) + } + + fn join_threads(&mut self) -> Vec { + let mut msg_list = Vec::new(); + + let mut i = 0; + while let Some(handle) = self.handles.pop() { + if let Err(panic) = handle.join() { + if let Some(panic_msg) = panic.downcast_ref::<&str>() { + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); + } else if let Some(panic_msg) = panic.downcast_ref::() { + msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); + } else { + msg_list.push(format!("thread {} ({i}) panicked", self.name)); + } + } + i += 1; + } + msg_list + } +} + +// Note: We make sure that all threads will be joined +impl Drop for ParallelHandler { + fn drop(&mut self) { + drop(self.input.take()); + while let Some(handle) = self.handles.pop() { + let _ = handle.join(); + } + } +} -- 2.47.3