From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate
Date: Thu, 12 Mar 2026 14:52:14 +0100 [thread overview]
Message-ID: <20260312135229.420729-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>
The also extraced proxmox-disks crate requires the ParallelHandler
helper, so we need to extract it as well.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Cargo.toml | 3 +
src/api2/tape/restore.rs | 25 ++-
src/backup/verify.rs | 3 +-
src/server/pull.rs | 5 +-
src/tape/pool_writer/new_chunks_iterator.rs | 3 +-
src/tools/disks/mod.rs | 2 +-
src/tools/mod.rs | 2 -
src/tools/parallel_handler.rs | 160 --------------------
8 files changed, 21 insertions(+), 182 deletions(-)
delete mode 100644 src/tools/parallel_handler.rs
diff --git a/Cargo.toml b/Cargo.toml
index cf993f514..57f6aa88e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -75,6 +75,7 @@ proxmox-network-api = "1"
proxmox-network-types = "1.0.1"
proxmox-notify = "1"
proxmox-openid = "1"
+proxmox-parallel-handler = "1"
proxmox-product-config = "1"
proxmox-rate-limiter = "1.0.0"
proxmox-rest-server = { version = "1.0.5", features = [ "templates" ] }
@@ -235,6 +236,7 @@ proxmox-network-types.workspace = true
proxmox-notify = { workspace = true, features = [ "pbs-context" ] }
proxmox-openid.workspace = true
proxmox-product-config.workspace = true
+proxmox-parallel-handler.workspace = true
proxmox-rate-limiter = { workspace = true, features = [ "shared-rate-limiter" ] }
proxmox-rest-server = { workspace = true, features = [ "rate-limited-stream" ] }
proxmox-router = { workspace = true, features = [ "cli", "server"] }
@@ -300,6 +302,7 @@ proxmox-rrd-api-types.workspace = true
#proxmox-network-types = { path = "../proxmox/proxmox-network-types" }
#proxmox-notify = { path = "../proxmox/proxmox-notify" }
#proxmox-openid = { path = "../proxmox/proxmox-openid" }
+#proxmox-parallel-handler = { path = "../proxmox/proxmox-parallel-handler" }
#proxmox-product-config = { path = "../proxmox/proxmox-product-config" }
#proxmox-rate-limiter = { path = "../proxmox/proxmox-rate-limiter" }
#proxmox-rest-server = { path = "../proxmox/proxmox-rest-server" }
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 4f2ee3db6..011037216 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -10,6 +10,7 @@ use tracing::{info, warn};
use proxmox_human_byte::HumanByte;
use proxmox_io::ReadExt;
+use proxmox_parallel_handler::ParallelHandler;
use proxmox_rest_server::WorkerTask;
use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
use proxmox_schema::{api, ApiType};
@@ -38,21 +39,17 @@ use pbs_tape::{
use crate::backup::check_ns_modification_privs;
use crate::tape::{assert_datastore_type, TapeNotificationMode};
-use crate::{
- tape::{
- drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver},
- file_formats::{
- CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader,
- PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
- PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
- PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
- PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
- PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2,
- },
- lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog,
- TAPE_STATUS_DIR,
+use crate::tape::{
+ drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver},
+ file_formats::{
+ CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader,
+ PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
+ PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
+ PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
+ PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
+ PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2,
},
- tools::parallel_handler::ParallelHandler,
+ lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog, TAPE_STATUS_DIR,
};
struct NamespaceMap {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index f52d77815..9e24d254d 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -8,6 +8,7 @@ use anyhow::{bail, Error};
use http_body_util::BodyExt;
use tracing::{error, info, warn};
+use proxmox_parallel_handler::{ParallelHandler, SendHandle};
use proxmox_worker_task::WorkerTaskContext;
use pbs_api_types::{
@@ -20,8 +21,6 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
-use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
-
use crate::backup::hierarchy::ListAccessibleBackupGroups;
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 57c5ef323..edc5e563d 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -8,9 +8,11 @@ use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use anyhow::{bail, format_err, Context, Error};
-use proxmox_human_byte::HumanByte;
use tracing::{info, warn};
+use proxmox_human_byte::HumanByte;
+use proxmox_parallel_handler::ParallelHandler;
+
use pbs_api_types::{
print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup,
BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, SnapshotListItem,
@@ -34,7 +36,6 @@ use super::sync::{
SkipReason, SyncSource, SyncSourceReader, SyncStats,
};
use crate::backup::{check_ns_modification_privs, check_ns_privs};
-use crate::tools::parallel_handler::ParallelHandler;
pub(crate) struct PullTarget {
store: Arc<DataStore>,
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 0e29516f8..f077d823f 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,10 +3,11 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
+use proxmox_parallel_handler::ParallelHandler;
+
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
-use crate::tools::parallel_handler::ParallelHandler;
/// Chunk iterator which uses separate threads to read chunks
///
diff --git a/src/tools/disks/mod.rs b/src/tools/disks/mod.rs
index a86cbdf79..4197d0b0f 100644
--- a/src/tools/disks/mod.rs
+++ b/src/tools/disks/mod.rs
@@ -21,7 +21,7 @@ use proxmox_sys::linux::procfs::{mountinfo::Device, MountInfo};
use pbs_api_types::{BLOCKDEVICE_DISK_AND_PARTITION_NAME_REGEX, BLOCKDEVICE_NAME_REGEX};
-use crate::tools::parallel_handler::ParallelHandler;
+use proxmox_parallel_handler::ParallelHandler;
mod zfs;
pub use zfs::*;
diff --git a/src/tools/mod.rs b/src/tools/mod.rs
index 6a975bde2..7f5acc0e3 100644
--- a/src/tools/mod.rs
+++ b/src/tools/mod.rs
@@ -20,8 +20,6 @@ pub mod statistics;
pub mod systemd;
pub mod ticket;
-pub mod parallel_handler;
-
pub fn assert_if_modified(digest1: &str, digest2: &str) -> Result<(), Error> {
if digest1 != digest2 {
bail!("detected modified configuration - file changed by other user? Try again.");
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
deleted file mode 100644
index 81e83bb13..000000000
--- a/src/tools/parallel_handler.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! A thread pool which run a closure in parallel.
-
-use std::sync::{Arc, Mutex};
-use std::thread::JoinHandle;
-
-use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
-
-/// A handle to send data to the worker thread (implements clone)
-pub struct SendHandle<I> {
- input: Sender<I>,
- abort: Arc<Mutex<Option<String>>>,
-}
-
-/// Returns the first error happened, if any
-pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
- let guard = abort.lock().unwrap();
- if let Some(err_msg) = &*guard {
- return Err(format_err!("{}", err_msg));
- }
- Ok(())
-}
-
-impl<I: Send> SendHandle<I> {
- /// Send data to the worker threads
- pub fn send(&self, input: I) -> Result<(), Error> {
- check_abort(&self.abort)?;
- match self.input.send(input) {
- Ok(()) => Ok(()),
- Err(_) => bail!("send failed - channel closed"),
- }
- }
-}
-
-/// A thread pool which run the supplied closure
-///
-/// The send command sends data to the worker threads. If one handler
-/// returns an error, we mark the channel as failed and it is no
-/// longer possible to send data.
-///
-/// When done, the 'complete()' method needs to be called to check for
-/// outstanding errors.
-pub struct ParallelHandler<I> {
- handles: Vec<JoinHandle<()>>,
- name: String,
- input: Option<SendHandle<I>>,
-}
-
-impl<I> Clone for SendHandle<I> {
- fn clone(&self) -> Self {
- Self {
- input: self.input.clone(),
- abort: Arc::clone(&self.abort),
- }
- }
-}
-
-impl<I: Send + 'static> ParallelHandler<I> {
- /// Create a new thread pool, each thread processing incoming data
- /// with 'handler_fn'.
- pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
- where
- F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
- {
- let mut handles = Vec::new();
- let (input_tx, input_rx) = bounded::<I>(threads);
-
- let abort = Arc::new(Mutex::new(None));
-
- for i in 0..threads {
- let input_rx = input_rx.clone();
- let abort = Arc::clone(&abort);
- let handler_fn = handler_fn.clone();
-
- handles.push(
- std::thread::Builder::new()
- .name(format!("{name} ({i})"))
- .spawn(move || loop {
- let data = match input_rx.recv() {
- Ok(data) => data,
- Err(_) => return,
- };
- if let Err(err) = (handler_fn)(data) {
- let mut guard = abort.lock().unwrap();
- if guard.is_none() {
- *guard = Some(err.to_string());
- }
- }
- })
- .unwrap(),
- );
- }
- Self {
- handles,
- name: name.to_string(),
- input: Some(SendHandle {
- input: input_tx,
- abort,
- }),
- }
- }
-
- /// Returns a cloneable channel to send data to the worker threads
- pub fn channel(&self) -> SendHandle<I> {
- self.input.as_ref().unwrap().clone()
- }
-
- /// Send data to the worker threads
- pub fn send(&self, input: I) -> Result<(), Error> {
- self.input.as_ref().unwrap().send(input)?;
- Ok(())
- }
-
- /// Wait for worker threads to complete and check for errors
- pub fn complete(mut self) -> Result<(), Error> {
- let input = self.input.take().unwrap();
- let abort = Arc::clone(&input.abort);
- check_abort(&abort)?;
- drop(input);
-
- let msg_list = self.join_threads();
-
- // an error might be encountered while waiting for the join
- check_abort(&abort)?;
-
- if msg_list.is_empty() {
- return Ok(());
- }
- Err(format_err!("{}", msg_list.join("\n")))
- }
-
- fn join_threads(&mut self) -> Vec<String> {
- let mut msg_list = Vec::new();
-
- let mut i = 0;
- while let Some(handle) = self.handles.pop() {
- if let Err(panic) = handle.join() {
- if let Some(panic_msg) = panic.downcast_ref::<&str>() {
- msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
- } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
- msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
- } else {
- msg_list.push(format!("thread {} ({i}) panicked", self.name));
- }
- }
- i += 1;
- }
- msg_list
- }
-}
-
-// Note: We make sure that all threads will be joined
-impl<I> Drop for ParallelHandler<I> {
- fn drop(&mut self) {
- drop(self.input.take());
- while let Some(handle) = self.handles.pop() {
- let _ = handle.join();
- }
- }
-}
--
2.47.3
next prev parent reply other threads:[~2026-03-12 13:53 UTC|newest]
Thread overview: 31+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 04/26] parallel-handler: add documentation Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27 ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20260312135229.420729-14-l.wagner@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=pdm-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.