public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
Date: Mon, 7 Dec 2020 14:24:07 +0100	[thread overview]
Message-ID: <6c3c2404-d5a6-c274-48b2-5691e7500348@proxmox.com> (raw)
In-Reply-To: <AM0PR09MB275424AC3D325ADE377445AFF2CE0@AM0PR09MB2754.eurprd09.prod.outlook.com>

hi,

i looked a bit more at this today

first, it seems that you are not that familiar with rust and
especially async rust (tokio, etc), please correct me
if i am wrong. it is a complicated topic, so i would suggest that you
make yourself familiar with the concepts, rusts 'reference 
documentation' can be found under

https://doc.rust-lang.org/book/
https://rust-lang.github.io/async-book/

and there are sure many other great resources out there.
ofc you can always ask here too, but it may take longer to explain.


On 12/7/20 3:51 AM, Niko Fellner wrote:
> By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why? >

here a atomicbool/thread::yield are all not necessary (i think it is 
even a hindrance, because you now pause tokio threads), all you need is 
rusts

std::sync::Mutex

the problem afaics, is that qemu cannot have multiple writers on the 
same block backend from multiple threads, and it seems
that is a general qemu limitation (you can even only have
one iothread per image) so i am not sure that it is possible
to implement what you want with a qemu layer

ofc you can ask the qemu people on their mailing list, maybe
i am simply not seeing how

so we have to synchronize on the writes, which makes the whole
patch less interesting since it will not solve your bottleneck
i assume?

the only thing we can parallelize is the download of the chunks,
but in my tests here, that did not improve restore speed at all

i attach my code (which is based on yours) that should be
ok i think,  but please do not use that as i only
tested very shortly... (it is more intended to be
a reference on how one could do such a thing)

i commented the mutex out for now, like this
it will crash with segfaults (like your code does)

with the mutex compiled in, it works but is
as slow as without the patch
(maybe playing around with the threadcount could still make a difference)

i hope this helps :)

------------

diff --git a/src/lib.rs b/src/lib.rs
index b755014..20650b7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -810,6 +810,14 @@ pub extern "C" fn proxmox_restore_image(
      verbose: bool,
  ) -> c_int {

+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
+
      let restore_task = restore_handle_to_task(handle);

      let result: Result<_, Error> = try_block!({
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
          let archive_name = tools::utf8_c_string(archive_name)?
              .ok_or_else(|| format_err!("archive_name must not be NULL"))?;

+        let foo = SafePointer { pointer: callback_data.clone() };
          let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() 
as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
          };

          let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
          };

          proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..caa5aeb 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
  use tokio::prelude::*;

  use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
  use proxmox_backup::backup::*;
  use proxmox_backup::client::{HttpClient, HttpClientOptions, 
BackupReader, RemoteChunkReader};

@@ -66,8 +67,8 @@ impl RestoreTask {
              let mut builder = tokio::runtime::Builder::new();
              builder.threaded_scheduler();
              builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(8);
+            builder.core_threads(6);
              builder.thread_name("proxmox-restore-worker");
              builder
          });
@@ -110,8 +111,8 @@ impl RestoreTask {
      pub async fn restore_image(
          &self,
          archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 
'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 
'static,
          verbose: bool,
      ) -> Result<(), Error> {

@@ -148,51 +149,92 @@ impl RestoreTask {
              most_used,
          );

-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
-
          let start_time = std::time::Instant::now();

-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size 
as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, 
&digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = 
tokio::sync::mpsc::unbounded_channel();
+
+//        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", 4,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+//                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+//                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
              }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = 
{}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = 
receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes 
= {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                  }
              }
-        }

-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, 
speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, 
duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| 
channel.send((digest, offset, chunk_size as u64)))?;
+        }

-        Ok(())
+        output.await?
      }

      pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {




  reply	other threads:[~2020-12-07 13:24 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-07  2:51 Niko Fellner
2020-12-07 13:24 ` Dominik Csapak [this message]
  -- strict thread matches above, loose matches on Subject: below --
2020-12-09  2:07 Niko Fellner
2020-12-07 22:59 Niko Fellner
2020-12-06  2:51 Niko Fellner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=6c3c2404-d5a6-c274-48b2-5691e7500348@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal