public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Niko Fellner <n.fellner@logics.de>
To: "pbs-devel@lists.proxmox.com" <pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
Date: Mon, 7 Dec 2020 22:59:15 +0000	[thread overview]
Message-ID: <AM0PR09MB27542561B5486B017A319261F2CE0@AM0PR09MB2754.eurprd09.prod.outlook.com> (raw)

Hi Dominik!

Thanks for your quick feedback.

Indeed, I have never done Rust before and only have a limited background in parallel programming. Thanks a lot for your help!

Anyhow, yes, your code as is performs even worse than the original, no matter how many threads you throw at it.

- Your code, with activated mutex: restore image complete (bytes=34359738368, duration=235.59s, speed=139.09MB/s)
- Original sync code:  restore image complete (bytes=34359738368, duration=224.78s, speed=145.78MB/s)

But there is an easy fix for it! 

+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;

Put your mutex here just below the line of the read_chunk, and you won't synchronize on the reads anymore.

Please run your restore performance test again, and try out if it performs faster now, for you too! 

In my benchmark (32 GB VM), your solution with this small fix ran faster than my code (best I had was "duration=153.03s, speed=214.13MB/s")
- 4 threads: restore image complete (bytes=34359738368, duration=154.67s, speed=211.86MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=144.58s, speed=226.65MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=143.41s, speed=228.49MB/s) (just another run, to verify it) 

Therefore the bottleneck of single threaded CPU performance is removed to a certain degree already, even without the parallel writes. On my machine it ran about 57% faster than the original sync code, as you can see. 

Yes, I think we should ask the qemu guys about it. Maybe they can even provide a fix - who knows.

I'll also benchmark an Azure VM, to check out the speedup with a big number of CPU threads and NVMe disks. 

By the way: I also benchmarked a mix of your lib.rs with my restore.rs "pub async fn restore_image":
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=155.07s, speed=211.31MB/s)
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=156.37s, speed=209.56MB/s) (just another run, to verify it)

Therefore your code really is superior (maybe it was the std::sync::Mutex instead of my thread::yield, or something different), and it would be great if you could try it out again.

Here I automatically set the number of CPUs to use:


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..4b42e02 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,10 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
+num_cpus = "1.13.0"
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..dee952e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -809,6 +809,14 @@ pub extern "C" fn proxmox_restore_image(
     error: * mut * mut c_char,
     verbose: bool,
 ) -> c_int {
+    
+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
 
     let restore_task = restore_handle_to_task(handle);
 
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
 
+        let foo = SafePointer { pointer: callback_data.clone() };
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..168853b 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
 use tokio::prelude::*;
 
 use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -62,12 +63,14 @@ impl RestoreTask {
     }
 
     pub fn new(setup: BackupSetup) -> Result<Self, Error> {
+        let vcpus = num_cpus::get();
+        eprintln!("{} vCPUs detected", vcpus);
         let runtime = get_runtime_with_builder(|| {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(2 * vcpus);
+            builder.core_threads(vcpus);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -110,8 +113,8 @@ impl RestoreTask {
     pub async fn restore_image(
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 'static,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -147,52 +150,96 @@ impl RestoreTask {
             file_info.chunk_crypt_mode(),
             most_used,
         );
-
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let vcpus = num_cpus::get();
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
+
+        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", vcpus,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+                        //let _guard = mutex.lock().unwrap(); // we don't want to sync too early here
+                        let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                 }
             }
-        }
 
-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| channel.send((digest, offset, chunk_size as u64)))?;
+        }
 
-        Ok(())
+        output.await?
     }
 
     pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {



             reply	other threads:[~2020-12-07 22:59 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-07 22:59 Niko Fellner [this message]
  -- strict thread matches above, loose matches on Subject: below --
2020-12-09  2:07 Niko Fellner
2020-12-07  2:51 Niko Fellner
2020-12-07 13:24 ` Dominik Csapak
2020-12-06  2:51 Niko Fellner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=AM0PR09MB27542561B5486B017A319261F2CE0@AM0PR09MB2754.eurprd09.prod.outlook.com \
    --to=n.fellner@logics.de \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal