From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id C93751FF13F for ; Thu, 12 Mar 2026 14:53:45 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2151A17CB3; Thu, 12 Mar 2026 14:53:42 +0100 (CET) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate Date: Thu, 12 Mar 2026 14:52:14 +0100 Message-ID: <20260312135229.420729-14-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com> References: <20260312135229.420729-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773323523502 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_MSPIKE_H2 0.001 Average reputation (+2) SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: MM56EALLAAQE3KYJSAG6LC5MMCDFII5B X-Message-ID-Hash: MM56EALLAAQE3KYJSAG6LC5MMCDFII5B X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: The also extraced proxmox-disks crate requires the ParallelHandler helper, so we need to extract it as well. Signed-off-by: Lukas Wagner --- Cargo.toml | 3 + src/api2/tape/restore.rs | 25 ++- src/backup/verify.rs | 3 +- src/server/pull.rs | 5 +- src/tape/pool_writer/new_chunks_iterator.rs | 3 +- src/tools/disks/mod.rs | 2 +- src/tools/mod.rs | 2 - src/tools/parallel_handler.rs | 160 -------------------- 8 files changed, 21 insertions(+), 182 deletions(-) delete mode 100644 src/tools/parallel_handler.rs diff --git a/Cargo.toml b/Cargo.toml index cf993f514..57f6aa88e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ proxmox-network-api = "1" proxmox-network-types = "1.0.1" proxmox-notify = "1" proxmox-openid = "1" +proxmox-parallel-handler = "1" proxmox-product-config = "1" proxmox-rate-limiter = "1.0.0" proxmox-rest-server = { version = "1.0.5", features = [ "templates" ] } @@ -235,6 +236,7 @@ proxmox-network-types.workspace = true proxmox-notify = { workspace = true, features = [ "pbs-context" ] } proxmox-openid.workspace = true proxmox-product-config.workspace = true +proxmox-parallel-handler.workspace = true proxmox-rate-limiter = { workspace = true, features = [ "shared-rate-limiter" ] } proxmox-rest-server = { workspace = true, features = [ "rate-limited-stream" ] } proxmox-router = { workspace = true, features = [ "cli", "server"] } @@ -300,6 +302,7 @@ proxmox-rrd-api-types.workspace = true #proxmox-network-types = { path = "../proxmox/proxmox-network-types" } #proxmox-notify = { path = "../proxmox/proxmox-notify" } #proxmox-openid = { path = "../proxmox/proxmox-openid" } +#proxmox-parallel-handler = { path = "../proxmox/proxmox-parallel-handler" } #proxmox-product-config = { path = "../proxmox/proxmox-product-config" } #proxmox-rate-limiter = { path = "../proxmox/proxmox-rate-limiter" } #proxmox-rest-server = { path = "../proxmox/proxmox-rest-server" } diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 4f2ee3db6..011037216 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -10,6 +10,7 @@ use tracing::{info, warn}; use proxmox_human_byte::HumanByte; use proxmox_io::ReadExt; +use proxmox_parallel_handler::ParallelHandler; use proxmox_rest_server::WorkerTask; use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType}; use proxmox_schema::{api, ApiType}; @@ -38,21 +39,17 @@ use pbs_tape::{ use crate::backup::check_ns_modification_privs; use crate::tape::{assert_datastore_type, TapeNotificationMode}; -use crate::{ - tape::{ - drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver}, - file_formats::{ - CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader, - PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1, - PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1, - PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0, - PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1, - PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2, - }, - lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog, - TAPE_STATUS_DIR, +use crate::tape::{ + drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver}, + file_formats::{ + CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader, + PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1, + PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1, + PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0, + PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1, + PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2, }, - tools::parallel_handler::ParallelHandler, + lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog, TAPE_STATUS_DIR, }; struct NamespaceMap { diff --git a/src/backup/verify.rs b/src/backup/verify.rs index f52d77815..9e24d254d 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -8,6 +8,7 @@ use anyhow::{bail, Error}; use http_body_util::BodyExt; use tracing::{error, info, warn}; +use proxmox_parallel_handler::{ParallelHandler, SendHandle}; use proxmox_worker_task::WorkerTaskContext; use pbs_api_types::{ @@ -20,8 +21,6 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile}; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress}; -use crate::tools::parallel_handler::{ParallelHandler, SendHandle}; - use crate::backup::hierarchy::ListAccessibleBackupGroups; /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have diff --git a/src/server/pull.rs b/src/server/pull.rs index 57c5ef323..edc5e563d 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -8,9 +8,11 @@ use std::sync::{Arc, Mutex}; use std::time::SystemTime; use anyhow::{bail, format_err, Context, Error}; -use proxmox_human_byte::HumanByte; use tracing::{info, warn}; +use proxmox_human_byte::HumanByte; +use proxmox_parallel_handler::ParallelHandler; + use pbs_api_types::{ print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup, BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, SnapshotListItem, @@ -34,7 +36,6 @@ use super::sync::{ SkipReason, SyncSource, SyncSourceReader, SyncStats, }; use crate::backup::{check_ns_modification_privs, check_ns_privs}; -use crate::tools::parallel_handler::ParallelHandler; pub(crate) struct PullTarget { store: Arc, diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs index 0e29516f8..f077d823f 100644 --- a/src/tape/pool_writer/new_chunks_iterator.rs +++ b/src/tape/pool_writer/new_chunks_iterator.rs @@ -3,10 +3,11 @@ use std::sync::{Arc, Mutex}; use anyhow::{format_err, Error}; +use proxmox_parallel_handler::ParallelHandler; + use pbs_datastore::{DataBlob, DataStore, SnapshotReader}; use crate::tape::CatalogSet; -use crate::tools::parallel_handler::ParallelHandler; /// Chunk iterator which uses separate threads to read chunks /// diff --git a/src/tools/disks/mod.rs b/src/tools/disks/mod.rs index a86cbdf79..4197d0b0f 100644 --- a/src/tools/disks/mod.rs +++ b/src/tools/disks/mod.rs @@ -21,7 +21,7 @@ use proxmox_sys::linux::procfs::{mountinfo::Device, MountInfo}; use pbs_api_types::{BLOCKDEVICE_DISK_AND_PARTITION_NAME_REGEX, BLOCKDEVICE_NAME_REGEX}; -use crate::tools::parallel_handler::ParallelHandler; +use proxmox_parallel_handler::ParallelHandler; mod zfs; pub use zfs::*; diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 6a975bde2..7f5acc0e3 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -20,8 +20,6 @@ pub mod statistics; pub mod systemd; pub mod ticket; -pub mod parallel_handler; - pub fn assert_if_modified(digest1: &str, digest2: &str) -> Result<(), Error> { if digest1 != digest2 { bail!("detected modified configuration - file changed by other user? Try again."); diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs deleted file mode 100644 index 81e83bb13..000000000 --- a/src/tools/parallel_handler.rs +++ /dev/null @@ -1,160 +0,0 @@ -//! A thread pool which run a closure in parallel. - -use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; - -use anyhow::{bail, format_err, Error}; -use crossbeam_channel::{bounded, Sender}; - -/// A handle to send data to the worker thread (implements clone) -pub struct SendHandle { - input: Sender, - abort: Arc>>, -} - -/// Returns the first error happened, if any -pub fn check_abort(abort: &Mutex>) -> Result<(), Error> { - let guard = abort.lock().unwrap(); - if let Some(err_msg) = &*guard { - return Err(format_err!("{}", err_msg)); - } - Ok(()) -} - -impl SendHandle { - /// Send data to the worker threads - pub fn send(&self, input: I) -> Result<(), Error> { - check_abort(&self.abort)?; - match self.input.send(input) { - Ok(()) => Ok(()), - Err(_) => bail!("send failed - channel closed"), - } - } -} - -/// A thread pool which run the supplied closure -/// -/// The send command sends data to the worker threads. If one handler -/// returns an error, we mark the channel as failed and it is no -/// longer possible to send data. -/// -/// When done, the 'complete()' method needs to be called to check for -/// outstanding errors. -pub struct ParallelHandler { - handles: Vec>, - name: String, - input: Option>, -} - -impl Clone for SendHandle { - fn clone(&self) -> Self { - Self { - input: self.input.clone(), - abort: Arc::clone(&self.abort), - } - } -} - -impl ParallelHandler { - /// Create a new thread pool, each thread processing incoming data - /// with 'handler_fn'. - pub fn new(name: &str, threads: usize, handler_fn: F) -> Self - where - F: Fn(I) -> Result<(), Error> + Send + Clone + 'static, - { - let mut handles = Vec::new(); - let (input_tx, input_rx) = bounded::(threads); - - let abort = Arc::new(Mutex::new(None)); - - for i in 0..threads { - let input_rx = input_rx.clone(); - let abort = Arc::clone(&abort); - let handler_fn = handler_fn.clone(); - - handles.push( - std::thread::Builder::new() - .name(format!("{name} ({i})")) - .spawn(move || loop { - let data = match input_rx.recv() { - Ok(data) => data, - Err(_) => return, - }; - if let Err(err) = (handler_fn)(data) { - let mut guard = abort.lock().unwrap(); - if guard.is_none() { - *guard = Some(err.to_string()); - } - } - }) - .unwrap(), - ); - } - Self { - handles, - name: name.to_string(), - input: Some(SendHandle { - input: input_tx, - abort, - }), - } - } - - /// Returns a cloneable channel to send data to the worker threads - pub fn channel(&self) -> SendHandle { - self.input.as_ref().unwrap().clone() - } - - /// Send data to the worker threads - pub fn send(&self, input: I) -> Result<(), Error> { - self.input.as_ref().unwrap().send(input)?; - Ok(()) - } - - /// Wait for worker threads to complete and check for errors - pub fn complete(mut self) -> Result<(), Error> { - let input = self.input.take().unwrap(); - let abort = Arc::clone(&input.abort); - check_abort(&abort)?; - drop(input); - - let msg_list = self.join_threads(); - - // an error might be encountered while waiting for the join - check_abort(&abort)?; - - if msg_list.is_empty() { - return Ok(()); - } - Err(format_err!("{}", msg_list.join("\n"))) - } - - fn join_threads(&mut self) -> Vec { - let mut msg_list = Vec::new(); - - let mut i = 0; - while let Some(handle) = self.handles.pop() { - if let Err(panic) = handle.join() { - if let Some(panic_msg) = panic.downcast_ref::<&str>() { - msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); - } else if let Some(panic_msg) = panic.downcast_ref::() { - msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name)); - } else { - msg_list.push(format!("thread {} ({i}) panicked", self.name)); - } - } - i += 1; - } - msg_list - } -} - -// Note: We make sure that all threads will be joined -impl Drop for ParallelHandler { - fn drop(&mut self) { - drop(self.input.take()); - while let Some(handle) = self.handles.pop() { - let _ = handle.join(); - } - } -} -- 2.47.3