public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup 13/26] tools: move ParallelHandler to new proxmox-parallel-handler crate
Date: Thu, 12 Mar 2026 14:52:14 +0100	[thread overview]
Message-ID: <20260312135229.420729-14-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260312135229.420729-1-l.wagner@proxmox.com>

The also extraced proxmox-disks crate requires the ParallelHandler
helper, so we need to extract it as well.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
 Cargo.toml                                  |   3 +
 src/api2/tape/restore.rs                    |  25 ++-
 src/backup/verify.rs                        |   3 +-
 src/server/pull.rs                          |   5 +-
 src/tape/pool_writer/new_chunks_iterator.rs |   3 +-
 src/tools/disks/mod.rs                      |   2 +-
 src/tools/mod.rs                            |   2 -
 src/tools/parallel_handler.rs               | 160 --------------------
 8 files changed, 21 insertions(+), 182 deletions(-)
 delete mode 100644 src/tools/parallel_handler.rs

diff --git a/Cargo.toml b/Cargo.toml
index cf993f514..57f6aa88e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -75,6 +75,7 @@ proxmox-network-api = "1"
 proxmox-network-types = "1.0.1"
 proxmox-notify = "1"
 proxmox-openid = "1"
+proxmox-parallel-handler = "1"
 proxmox-product-config = "1"
 proxmox-rate-limiter = "1.0.0"
 proxmox-rest-server = { version = "1.0.5", features = [ "templates" ] }
@@ -235,6 +236,7 @@ proxmox-network-types.workspace = true
 proxmox-notify = { workspace = true, features = [ "pbs-context" ] }
 proxmox-openid.workspace = true
 proxmox-product-config.workspace = true
+proxmox-parallel-handler.workspace = true
 proxmox-rate-limiter = { workspace = true, features = [ "shared-rate-limiter" ] }
 proxmox-rest-server = { workspace = true, features = [ "rate-limited-stream" ] }
 proxmox-router = { workspace = true, features = [ "cli", "server"] }
@@ -300,6 +302,7 @@ proxmox-rrd-api-types.workspace = true
 #proxmox-network-types = { path = "../proxmox/proxmox-network-types" }
 #proxmox-notify = { path = "../proxmox/proxmox-notify" }
 #proxmox-openid = { path = "../proxmox/proxmox-openid" }
+#proxmox-parallel-handler = { path = "../proxmox/proxmox-parallel-handler" }
 #proxmox-product-config = { path = "../proxmox/proxmox-product-config" }
 #proxmox-rate-limiter = { path = "../proxmox/proxmox-rate-limiter" }
 #proxmox-rest-server = { path = "../proxmox/proxmox-rest-server" }
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 4f2ee3db6..011037216 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -10,6 +10,7 @@ use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
 use proxmox_io::ReadExt;
+use proxmox_parallel_handler::ParallelHandler;
 use proxmox_rest_server::WorkerTask;
 use proxmox_router::{Permission, Router, RpcEnvironment, RpcEnvironmentType};
 use proxmox_schema::{api, ApiType};
@@ -38,21 +39,17 @@ use pbs_tape::{
 
 use crate::backup::check_ns_modification_privs;
 use crate::tape::{assert_datastore_type, TapeNotificationMode};
-use crate::{
-    tape::{
-        drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver},
-        file_formats::{
-            CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader,
-            PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
-            PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
-            PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
-            PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
-            PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2,
-        },
-        lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog,
-        TAPE_STATUS_DIR,
+use crate::tape::{
+    drive::{lock_tape_device, request_and_load_media, set_tape_device_state, TapeDriver},
+    file_formats::{
+        CatalogArchiveHeader, ChunkArchiveDecoder, ChunkArchiveHeader, SnapshotArchiveHeader,
+        PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CATALOG_ARCHIVE_MAGIC_1_1,
+        PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_CHUNK_ARCHIVE_MAGIC_1_1,
+        PROXMOX_BACKUP_MEDIA_LABEL_MAGIC_1_0, PROXMOX_BACKUP_MEDIA_SET_LABEL_MAGIC_1_0,
+        PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_0, PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_1,
+        PROXMOX_BACKUP_SNAPSHOT_ARCHIVE_MAGIC_1_2,
     },
-    tools::parallel_handler::ParallelHandler,
+    lock_media_set, Inventory, MediaCatalog, MediaId, MediaSet, MediaSetCatalog, TAPE_STATUS_DIR,
 };
 
 struct NamespaceMap {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index f52d77815..9e24d254d 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -8,6 +8,7 @@ use anyhow::{bail, Error};
 use http_body_util::BodyExt;
 use tracing::{error, info, warn};
 
+use proxmox_parallel_handler::{ParallelHandler, SendHandle};
 use proxmox_worker_task::WorkerTaskContext;
 
 use pbs_api_types::{
@@ -20,8 +21,6 @@ use pbs_datastore::index::{ChunkReadInfo, IndexFile};
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::{DataBlob, DataStore, DatastoreBackend, StoreProgress};
 
-use crate::tools::parallel_handler::{ParallelHandler, SendHandle};
-
 use crate::backup::hierarchy::ListAccessibleBackupGroups;
 
 /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 57c5ef323..edc5e563d 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -8,9 +8,11 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Context, Error};
-use proxmox_human_byte::HumanByte;
 use tracing::{info, warn};
 
+use proxmox_human_byte::HumanByte;
+use proxmox_parallel_handler::ParallelHandler;
+
 use pbs_api_types::{
     print_store_and_ns, ArchiveType, Authid, BackupArchiveName, BackupDir, BackupGroup,
     BackupNamespace, GroupFilter, Operation, RateLimitConfig, Remote, SnapshotListItem,
@@ -34,7 +36,6 @@ use super::sync::{
     SkipReason, SyncSource, SyncSourceReader, SyncStats,
 };
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
-use crate::tools::parallel_handler::ParallelHandler;
 
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 0e29516f8..f077d823f 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,10 +3,11 @@ use std::sync::{Arc, Mutex};
 
 use anyhow::{format_err, Error};
 
+use proxmox_parallel_handler::ParallelHandler;
+
 use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
 
 use crate::tape::CatalogSet;
-use crate::tools::parallel_handler::ParallelHandler;
 
 /// Chunk iterator which uses separate threads to read chunks
 ///
diff --git a/src/tools/disks/mod.rs b/src/tools/disks/mod.rs
index a86cbdf79..4197d0b0f 100644
--- a/src/tools/disks/mod.rs
+++ b/src/tools/disks/mod.rs
@@ -21,7 +21,7 @@ use proxmox_sys::linux::procfs::{mountinfo::Device, MountInfo};
 
 use pbs_api_types::{BLOCKDEVICE_DISK_AND_PARTITION_NAME_REGEX, BLOCKDEVICE_NAME_REGEX};
 
-use crate::tools::parallel_handler::ParallelHandler;
+use proxmox_parallel_handler::ParallelHandler;
 
 mod zfs;
 pub use zfs::*;
diff --git a/src/tools/mod.rs b/src/tools/mod.rs
index 6a975bde2..7f5acc0e3 100644
--- a/src/tools/mod.rs
+++ b/src/tools/mod.rs
@@ -20,8 +20,6 @@ pub mod statistics;
 pub mod systemd;
 pub mod ticket;
 
-pub mod parallel_handler;
-
 pub fn assert_if_modified(digest1: &str, digest2: &str) -> Result<(), Error> {
     if digest1 != digest2 {
         bail!("detected modified configuration - file changed by other user? Try again.");
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
deleted file mode 100644
index 81e83bb13..000000000
--- a/src/tools/parallel_handler.rs
+++ /dev/null
@@ -1,160 +0,0 @@
-//! A thread pool which run a closure in parallel.
-
-use std::sync::{Arc, Mutex};
-use std::thread::JoinHandle;
-
-use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
-
-/// A handle to send data to the worker thread (implements clone)
-pub struct SendHandle<I> {
-    input: Sender<I>,
-    abort: Arc<Mutex<Option<String>>>,
-}
-
-/// Returns the first error happened, if any
-pub fn check_abort(abort: &Mutex<Option<String>>) -> Result<(), Error> {
-    let guard = abort.lock().unwrap();
-    if let Some(err_msg) = &*guard {
-        return Err(format_err!("{}", err_msg));
-    }
-    Ok(())
-}
-
-impl<I: Send> SendHandle<I> {
-    /// Send data to the worker threads
-    pub fn send(&self, input: I) -> Result<(), Error> {
-        check_abort(&self.abort)?;
-        match self.input.send(input) {
-            Ok(()) => Ok(()),
-            Err(_) => bail!("send failed - channel closed"),
-        }
-    }
-}
-
-/// A thread pool which run the supplied closure
-///
-/// The send command sends data to the worker threads. If one handler
-/// returns an error, we mark the channel as failed and it is no
-/// longer possible to send data.
-///
-/// When done, the 'complete()' method needs to be called to check for
-/// outstanding errors.
-pub struct ParallelHandler<I> {
-    handles: Vec<JoinHandle<()>>,
-    name: String,
-    input: Option<SendHandle<I>>,
-}
-
-impl<I> Clone for SendHandle<I> {
-    fn clone(&self) -> Self {
-        Self {
-            input: self.input.clone(),
-            abort: Arc::clone(&self.abort),
-        }
-    }
-}
-
-impl<I: Send + 'static> ParallelHandler<I> {
-    /// Create a new thread pool, each thread processing incoming data
-    /// with 'handler_fn'.
-    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
-    where
-        F: Fn(I) -> Result<(), Error> + Send + Clone + 'static,
-    {
-        let mut handles = Vec::new();
-        let (input_tx, input_rx) = bounded::<I>(threads);
-
-        let abort = Arc::new(Mutex::new(None));
-
-        for i in 0..threads {
-            let input_rx = input_rx.clone();
-            let abort = Arc::clone(&abort);
-            let handler_fn = handler_fn.clone();
-
-            handles.push(
-                std::thread::Builder::new()
-                    .name(format!("{name} ({i})"))
-                    .spawn(move || loop {
-                        let data = match input_rx.recv() {
-                            Ok(data) => data,
-                            Err(_) => return,
-                        };
-                        if let Err(err) = (handler_fn)(data) {
-                            let mut guard = abort.lock().unwrap();
-                            if guard.is_none() {
-                                *guard = Some(err.to_string());
-                            }
-                        }
-                    })
-                    .unwrap(),
-            );
-        }
-        Self {
-            handles,
-            name: name.to_string(),
-            input: Some(SendHandle {
-                input: input_tx,
-                abort,
-            }),
-        }
-    }
-
-    /// Returns a cloneable channel to send data to the worker threads
-    pub fn channel(&self) -> SendHandle<I> {
-        self.input.as_ref().unwrap().clone()
-    }
-
-    /// Send data to the worker threads
-    pub fn send(&self, input: I) -> Result<(), Error> {
-        self.input.as_ref().unwrap().send(input)?;
-        Ok(())
-    }
-
-    /// Wait for worker threads to complete and check for errors
-    pub fn complete(mut self) -> Result<(), Error> {
-        let input = self.input.take().unwrap();
-        let abort = Arc::clone(&input.abort);
-        check_abort(&abort)?;
-        drop(input);
-
-        let msg_list = self.join_threads();
-
-        // an error might be encountered while waiting for the join
-        check_abort(&abort)?;
-
-        if msg_list.is_empty() {
-            return Ok(());
-        }
-        Err(format_err!("{}", msg_list.join("\n")))
-    }
-
-    fn join_threads(&mut self) -> Vec<String> {
-        let mut msg_list = Vec::new();
-
-        let mut i = 0;
-        while let Some(handle) = self.handles.pop() {
-            if let Err(panic) = handle.join() {
-                if let Some(panic_msg) = panic.downcast_ref::<&str>() {
-                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
-                } else if let Some(panic_msg) = panic.downcast_ref::<String>() {
-                    msg_list.push(format!("thread {} ({i}) panicked: {panic_msg}", self.name));
-                } else {
-                    msg_list.push(format!("thread {} ({i}) panicked", self.name));
-                }
-            }
-            i += 1;
-        }
-        msg_list
-    }
-}
-
-// Note: We make sure that all threads will be joined
-impl<I> Drop for ParallelHandler<I> {
-    fn drop(&mut self) {
-        drop(self.input.take());
-        while let Some(handle) = self.handles.pop() {
-            let _ = handle.join();
-        }
-    }
-}
-- 
2.47.3





  parent reply	other threads:[~2026-03-12 13:53 UTC|newest]

Thread overview: 31+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-03-12 13:52 [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 01/26] sys: procfs: don't read from sysfs during unit tests Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 02/26] parallel-handler: import code from Proxmox Backup Server Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 03/26] parallel-handler: introduce custom error type Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 04/26] parallel-handler: add documentation Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 05/26] parallel-handler: add simple unit-test suite Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 06/26] disks: import from Proxmox Backup Server Lukas Wagner
2026-03-16 13:13   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 07/26] disks: fix typo in `initialize_gpt_disk` Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 08/26] disks: add parts of gather_disk_stats from PBS Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 09/26] disks: gate api macro behind 'api-types' feature Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 10/26] disks: clippy: collapse if-let chains where possible Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox 11/26] procfs: add helpers for querying pressure stall information Lukas Wagner
2026-03-16 13:25   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox 12/26] time: use u64 parse helper from nom Lukas Wagner
2026-03-12 13:52 ` Lukas Wagner [this message]
2026-03-12 13:52 ` [PATCH proxmox-backup 14/26] tools: replace disks module with proxmox-disks Lukas Wagner
2026-03-16 13:27   ` Arthur Bied-Charreton
2026-03-12 13:52 ` [PATCH proxmox-backup 15/26] metric collection: use blockdev_stat_for_path from proxmox_disks Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 16/26] node status panel: add `children` property Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 17/26] RRDGrid: fix size observer by attaching node reference to rendered container Lukas Wagner
2026-03-12 13:52 ` [PATCH proxmox-yew-comp 18/26] RRDGrid: add padding and increase gap between elements Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 19/26] metric collection: clarify naming for remote metric collection Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 20/26] metric collection: fix minor typo in error message Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 21/26] metric collection: collect PDM host metrics in a new collection task Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 22/26] api: fix /nodes/localhost/rrddata endpoint Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 23/26] pdm: node rrd data: rename 'total-time' to 'metric-collection-total-time' Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 24/26] pdm-api-types: add PDM host metric fields Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 25/26] ui: node status: add RRD graphs for PDM host metrics Lukas Wagner
2026-03-12 13:52 ` [PATCH datacenter-manager 26/26] ui: lxc/qemu/node: use RRD value render helpers Lukas Wagner
2026-03-16 13:42 ` [PATCH datacenter-manager/proxmox{,-backup,-yew-comp} 00/26] metric collection for the PDM host Arthur Bied-Charreton

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260312135229.420729-14-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal