public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox/proxmox-backup v3] tape: implement multithreaded read
@ 2025-02-21 15:06 Dominik Csapak
  2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox v3 1/1] pbs api types: tape backup job: add worker threads option Dominik Csapak
  2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox-backup v3 1/1] tape: introduce a tape backup job worker thread option Dominik Csapak
  0 siblings, 2 replies; 3+ messages in thread
From: Dominik Csapak @ 2025-02-21 15:06 UTC (permalink / raw)
  To: pbs-devel

this is a replacement series for my previous patches[0]

I dropped the verification patches again, and moved the thread config to
the tape jobs/tape backup api. That way it's much nearer the actual
issue, and we don't run into problems with resource constraints, since
one job cannot run multiple times (if a user configures many jobs in
parallel, that must be deliberate then).

changes from v2:
* slight rewording of the commit message
* rebase on master (include pbs-api-types repo change)
* keep default to 1
* keep minimum of channel size 3

changes from v1:
* reordered code from verify,tape to tape,verify
* marked verify patches as rfc
* use a struct for saving the values in the datastore (as thomas
  suggested)
* introduces another verify patch to merge the chunk loading into the
  worker threads

0: https://lore.proxmox.com/pbs-devel/20240507072955.364206-1-d.csapak@proxmox.com/

proxmox:

Dominik Csapak (1):
  pbs api types: tape backup job: add worker threads option

 pbs-api-types/src/jobs.rs | 10 ++++++++++
 1 file changed, 10 insertions(+)

proxmox-backup:

Dominik Csapak (1):
  tape: introduce a tape backup job worker thread option

 src/api2/config/tape_backup_job.rs          |  8 ++++
 src/api2/tape/backup.rs                     |  4 ++
 src/tape/pool_writer/mod.rs                 | 14 ++++++-
 src/tape/pool_writer/new_chunks_iterator.rs | 44 +++++++++++++--------
 www/tape/window/TapeBackup.js               | 12 ++++++
 www/tape/window/TapeBackupJob.js            | 14 +++++++
 6 files changed, 79 insertions(+), 17 deletions(-)

-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [pbs-devel] [PATCH proxmox v3 1/1] pbs api types: tape backup job: add worker threads option
  2025-02-21 15:06 [pbs-devel] [PATCH proxmox/proxmox-backup v3] tape: implement multithreaded read Dominik Csapak
@ 2025-02-21 15:06 ` Dominik Csapak
  2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox-backup v3 1/1] tape: introduce a tape backup job worker thread option Dominik Csapak
  1 sibling, 0 replies; 3+ messages in thread
From: Dominik Csapak @ 2025-02-21 15:06 UTC (permalink / raw)
  To: pbs-devel

Sometimes it's useful to give the user control over how much
parallelized the job is, so introduce a worker thread option.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 pbs-api-types/src/jobs.rs | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 04631d92..d0b94a24 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -306,6 +306,13 @@ pub struct VerificationJobStatus {
             schema: crate::NS_MAX_DEPTH_SCHEMA,
             optional: true,
         },
+        "worker-threads": {
+            type: Integer,
+            optional: true,
+            minimum: 1,
+            maximum: 32,
+            default: 1,
+        },
     }
 )]
 #[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -332,6 +339,9 @@ pub struct TapeBackupJobSetup {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none", default)]
     pub max_depth: Option<usize>,
+    /// Set the number of worker threads to use for the job
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub worker_threads: Option<u64>,
 }
 
 #[api(
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 3+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 1/1] tape: introduce a tape backup job worker thread option
  2025-02-21 15:06 [pbs-devel] [PATCH proxmox/proxmox-backup v3] tape: implement multithreaded read Dominik Csapak
  2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox v3 1/1] pbs api types: tape backup job: add worker threads option Dominik Csapak
@ 2025-02-21 15:06 ` Dominik Csapak
  1 sibling, 0 replies; 3+ messages in thread
From: Dominik Csapak @ 2025-02-21 15:06 UTC (permalink / raw)
  To: pbs-devel

Using a single thread for reading is not optimal in some cases, e.g.
when the underlying storage can handle reads from multiple threads in
parallel.

We use the ParallelHandler to handle the actual reads. Make the
sync_channel buffer size depend on the number of threads so we have
space for two chunks per thread. (But keep the minimum to 3 like
before).

How this impacts the backup speed largely depends on the underlying
storage and how the backup is laid out on it.

I benchmarked the following setups:

* Setup A: relatively spread out backup on a virtualized pbs on single HDDs
* Setup B: mostly sequential chunks on a virtualized pbs on single HDDs
* Setup C: backup on virtualized pbs on a fast NVME
* Setup D: backup on bare metal pbs with ZFS in a RAID10 with 6 HDDs
  and 2 fast special devices in a mirror

(values are reported in MB/s as seen in the task log, caches were
cleared between runs, backups were bigger than the memory available)

setup  1 thread  2 threads  4 threads  8 threads
A      55        70         80         95
B      110       89         100        108
C      294       294        294        294
D      118       180        300        300

So there are cases where multiple read threads speed up the tape backup
(dramatically). On the other hand there are situations where reading
from a single thread is actually faster, probably because we can read
from the HDD sequentially.

I left the default value of '1' to not change the default behavior.

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
changes from v2:
* move the ui config to the tape backup jobs and tape backup window
* use pool writer to store the thread count, not datastore
* keep minimum of channel size 3
* keep default of one thread

i left the benchmark data intact, since the actual code that does
the multithreading is the same as before, and i could not find a
virtual setup to replicate the performance of a hdd very well
(limiting virtual disks iops does not really do the trick due to
disk caching, etc.)

If wanted i can of course setup a physical testbed with multiple hdds
again.

 src/api2/config/tape_backup_job.rs          |  8 ++++
 src/api2/tape/backup.rs                     |  4 ++
 src/tape/pool_writer/mod.rs                 | 14 ++++++-
 src/tape/pool_writer/new_chunks_iterator.rs | 44 +++++++++++++--------
 www/tape/window/TapeBackup.js               | 12 ++++++
 www/tape/window/TapeBackupJob.js            | 14 +++++++
 6 files changed, 79 insertions(+), 17 deletions(-)

diff --git a/src/api2/config/tape_backup_job.rs b/src/api2/config/tape_backup_job.rs
index b6db92998..f2abf652b 100644
--- a/src/api2/config/tape_backup_job.rs
+++ b/src/api2/config/tape_backup_job.rs
@@ -140,6 +140,8 @@ pub enum DeletableProperty {
     MaxDepth,
     /// Delete the 'ns' property
     Ns,
+    /// Delete the 'worker-threads' property
+    WorkerThreads,
 }
 
 #[api(
@@ -222,6 +224,9 @@ pub fn update_tape_backup_job(
                 DeletableProperty::Ns => {
                     data.setup.ns = None;
                 }
+                DeletableProperty::WorkerThreads => {
+                    data.setup.worker_threads = None;
+                }
             }
         }
     }
@@ -260,6 +265,9 @@ pub fn update_tape_backup_job(
     if update.setup.max_depth.is_some() {
         data.setup.max_depth = update.setup.max_depth;
     }
+    if update.setup.worker_threads.is_some() {
+        data.setup.worker_threads = update.setup.worker_threads;
+    }
 
     let schedule_changed = data.schedule != update.schedule;
     if update.schedule.is_some() {
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a01897..31293a9a9 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -387,6 +387,10 @@ fn backup_worker(
         ns_magic,
     )?;
 
+    if let Some(threads) = setup.worker_threads {
+        pool_writer.set_read_thread_count(threads as usize);
+    }
+
     let mut group_list = Vec::new();
     let namespaces = datastore.recursive_iter_backup_ns_ok(root_namespace, setup.max_depth)?;
     for ns in namespaces {
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 3114ec061..540844212 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -56,6 +56,7 @@ pub struct PoolWriter {
     notification_mode: TapeNotificationMode,
     ns_magic: bool,
     used_tapes: HashSet<Uuid>,
+    read_threads: usize,
 }
 
 impl PoolWriter {
@@ -93,9 +94,15 @@ impl PoolWriter {
             notification_mode,
             ns_magic,
             used_tapes: HashSet::new(),
+            read_threads: 1,
         })
     }
 
+    /// Set the read threads to use when writing a backup to tape
+    pub fn set_read_thread_count(&mut self, read_threads: usize) {
+        self.read_threads = read_threads;
+    }
+
     pub fn pool(&mut self) -> &mut MediaPool {
         &mut self.pool
     }
@@ -541,7 +548,12 @@ impl PoolWriter {
         datastore: Arc<DataStore>,
         snapshot_reader: Arc<Mutex<SnapshotReader>>,
     ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
-        NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
+        NewChunksIterator::spawn(
+            datastore,
+            snapshot_reader,
+            Arc::clone(&self.catalog_set),
+            self.read_threads,
+        )
     }
 
     pub(crate) fn catalog_version(&self) -> [u8; 8] {
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 1454b33d2..de847b3c9 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -6,8 +6,9 @@ use anyhow::{format_err, Error};
 use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
 
 use crate::tape::CatalogSet;
+use crate::tools::parallel_handler::ParallelHandler;
 
-/// Chunk iterator which use a separate thread to read chunks
+/// Chunk iterator which uses separate threads to read chunks
 ///
 /// The iterator skips duplicate chunks and chunks already in the
 /// catalog.
@@ -24,8 +25,11 @@ impl NewChunksIterator {
         datastore: Arc<DataStore>,
         snapshot_reader: Arc<Mutex<SnapshotReader>>,
         catalog_set: Arc<Mutex<CatalogSet>>,
+        read_threads: usize,
     ) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
-        let (tx, rx) = std::sync::mpsc::sync_channel(3);
+        // use twice the threadcount for the channel, so the read thread can already send another
+        // one when the previous one was not consumed yet, but keep the minimum at 3
+        let (tx, rx) = std::sync::mpsc::sync_channel((read_threads * 2).max(3));
 
         let reader_thread = std::thread::spawn(move || {
             let snapshot_reader = snapshot_reader.lock().unwrap();
@@ -35,36 +39,44 @@ impl NewChunksIterator {
             let datastore_name = snapshot_reader.datastore_name().to_string();
 
             let result: Result<(), Error> = proxmox_lang::try_block!({
-                let mut chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
+                let chunk_iter = snapshot_reader.chunk_iterator(move |digest| {
                     catalog_set
                         .lock()
                         .unwrap()
                         .contains_chunk(&datastore_name, digest)
                 })?;
 
-                loop {
-                    let digest = match chunk_iter.next() {
-                        None => {
-                            let _ = tx.send(Ok(None)); // ignore send error
-                            break;
+                let reader_pool =
+                    ParallelHandler::new("tape backup chunk reader pool", read_threads, {
+                        let tx = tx.clone();
+                        move |digest| {
+                            let blob = datastore.load_chunk(&digest)?;
+                            //println!("LOAD CHUNK {}", hex::encode(&digest));
+
+                            tx.send(Ok(Some((digest, blob)))).map_err(|err| {
+                                format_err!("error sending result from reader thread: {err}")
+                            })?;
+
+                            Ok(())
                         }
-                        Some(digest) => digest?,
-                    };
+                    });
+
+                for digest in chunk_iter {
+                    let digest = digest?;
 
                     if chunk_index.contains(&digest) {
                         continue;
                     }
 
-                    let blob = datastore.load_chunk(&digest)?;
-                    //println!("LOAD CHUNK {}", hex::encode(&digest));
-                    if let Err(err) = tx.send(Ok(Some((digest, blob)))) {
-                        eprintln!("could not send chunk to reader thread: {err}");
-                        break;
-                    }
+                    reader_pool.send(digest)?;
 
                     chunk_index.insert(digest);
                 }
 
+                reader_pool.complete()?;
+
+                let _ = tx.send(Ok(None)); // ignore send error
+
                 Ok(())
             });
             if let Err(err) = result {
diff --git a/www/tape/window/TapeBackup.js b/www/tape/window/TapeBackup.js
index 7a45e388c..fc8be80e3 100644
--- a/www/tape/window/TapeBackup.js
+++ b/www/tape/window/TapeBackup.js
@@ -119,6 +119,18 @@ Ext.define('PBS.TapeManagement.TapeBackupWindow', {
 		    renderer: Ext.String.htmlEncode,
 		},
 	    ],
+
+	    advancedColumn1: [
+		{
+		    xtype: 'proxmoxintegerfield',
+		    name: 'worker-threads',
+		    fieldLabel: gettext('# of Worker Threads'),
+		    emptyText: '1',
+		    skipEmptyText: true,
+		    minValue: 1,
+		    maxValue: 32,
+		},
+	    ],
 	},
     ],
 
diff --git a/www/tape/window/TapeBackupJob.js b/www/tape/window/TapeBackupJob.js
index 12623712a..24afb9fb6 100644
--- a/www/tape/window/TapeBackupJob.js
+++ b/www/tape/window/TapeBackupJob.js
@@ -216,6 +216,20 @@ Ext.define('PBS.TapeManagement.BackupJobEdit', {
 			},
 		    },
 		],
+
+		advancedColumn1: [
+		    {
+			xtype: 'proxmoxintegerfield',
+			name: 'worker-threads',
+			fieldLabel: gettext('# of Worker Threads'),
+			emptyText: '1',
+			minValue: 1,
+			maxValue: 32,
+			cbind: {
+			    deleteEmpty: '{!isCreate}',
+			},
+		    },
+		],
 	    },
 	    {
 		xtype: 'inputpanel',
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2025-02-21 15:07 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2025-02-21 15:06 [pbs-devel] [PATCH proxmox/proxmox-backup v3] tape: implement multithreaded read Dominik Csapak
2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox v3 1/1] pbs api types: tape backup job: add worker threads option Dominik Csapak
2025-02-21 15:06 ` [pbs-devel] [PATCH proxmox-backup v3 1/1] tape: introduce a tape backup job worker thread option Dominik Csapak

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal