public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
@ 2020-12-07 22:59 Niko Fellner
  0 siblings, 0 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-07 22:59 UTC (permalink / raw)
  To: pbs-devel

Hi Dominik!

Thanks for your quick feedback.

Indeed, I have never done Rust before and only have a limited background in parallel programming. Thanks a lot for your help!

Anyhow, yes, your code as is performs even worse than the original, no matter how many threads you throw at it.

- Your code, with activated mutex: restore image complete (bytes=34359738368, duration=235.59s, speed=139.09MB/s)
- Original sync code:  restore image complete (bytes=34359738368, duration=224.78s, speed=145.78MB/s)

But there is an easy fix for it! 

+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;

Put your mutex here just below the line of the read_chunk, and you won't synchronize on the reads anymore.

Please run your restore performance test again, and try out if it performs faster now, for you too! 

In my benchmark (32 GB VM), your solution with this small fix ran faster than my code (best I had was "duration=153.03s, speed=214.13MB/s")
- 4 threads: restore image complete (bytes=34359738368, duration=154.67s, speed=211.86MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=144.58s, speed=226.65MB/s)
- 12 threads: restore image complete (bytes=34359738368, duration=143.41s, speed=228.49MB/s) (just another run, to verify it) 

Therefore the bottleneck of single threaded CPU performance is removed to a certain degree already, even without the parallel writes. On my machine it ran about 57% faster than the original sync code, as you can see. 

Yes, I think we should ask the qemu guys about it. Maybe they can even provide a fix - who knows.

I'll also benchmark an Azure VM, to check out the speedup with a big number of CPU threads and NVMe disks. 

By the way: I also benchmarked a mix of your lib.rs with my restore.rs "pub async fn restore_image":
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=155.07s, speed=211.31MB/s)
- 12 threads: restore image complete (bytes=34359738368, zeroes=22322085888, duration=156.37s, speed=209.56MB/s) (just another run, to verify it)

Therefore your code really is superior (maybe it was the std::sync::Mutex instead of my thread::yield, or something different), and it would be great if you could try it out again.

Here I automatically set the number of CPUs to use:


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..4b42e02 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,10 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
+num_cpus = "1.13.0"
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..dee952e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -809,6 +809,14 @@ pub extern "C" fn proxmox_restore_image(
     error: * mut * mut c_char,
     verbose: bool,
 ) -> c_int {
+    
+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
 
     let restore_task = restore_handle_to_task(handle);
 
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
 
+        let foo = SafePointer { pointer: callback_data.clone() };
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..168853b 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
 use tokio::prelude::*;
 
 use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -62,12 +63,14 @@ impl RestoreTask {
     }
 
     pub fn new(setup: BackupSetup) -> Result<Self, Error> {
+        let vcpus = num_cpus::get();
+        eprintln!("{} vCPUs detected", vcpus);
         let runtime = get_runtime_with_builder(|| {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(2 * vcpus);
+            builder.core_threads(vcpus);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -110,8 +113,8 @@ impl RestoreTask {
     pub async fn restore_image(
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 'static,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -147,52 +150,96 @@ impl RestoreTask {
             file_info.chunk_crypt_mode(),
             most_used,
         );
-
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let vcpus = num_cpus::get();
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
+
+        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", vcpus,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+                        //let _guard = mutex.lock().unwrap(); // we don't want to sync too early here
+                        let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let _guard = mutex.lock().unwrap();
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                 }
             }
-        }
 
-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| channel.send((digest, offset, chunk_size as u64)))?;
+        }
 
-        Ok(())
+        output.await?
     }
 
     pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
@ 2020-12-09  2:07 Niko Fellner
  0 siblings, 0 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-09  2:07 UTC (permalink / raw)
  To: pbs-devel

I did some benchmarks now at an Azure L32s_v2 (32 vCPUs, 256 GB memory, 4x 2 TB NVMe; on tests with ZFS I reduced the zfs-arc-max to 8 GiB, because I didn't want to test the memory only)

Overall the speedup of the parallel pbs-restore I encountered was at least 2.1x here, often about 2.6x, but in some tests even up to 6.8x; On big VMs a speedup of 3.6x.

The CPU usage of the most busy pbs-restore process during the 750 GiB restore was often about 300-450% (not great for 32 threads, but we only do parallel reads, so...); the original sync pbs-restore only showed about 98% CPU usage in htop.

First, some tests with my 32 GiB VM (zeroes = 64%):

> Results (relative to 32 GiB):
> Host;          Method;              source;                 target;                 Start;    End;      seconds(End - Start);  speed (End-Start);   speedup;  seconds(PVE log); speed(PVE log); speedup
> Azure L32s_v2; orig. pbs-restore;   nvme disk 1 ZFS pool;   nvme disk 2 ZFS dir;    21:10:38; 21:13:14;  156;                    210.05 MiB/s;      1.0;        58.69;            558.31 MiB/s;  1.0
> Azure L32s_v2; paral.pbs-restore;   nvme disk 1 ZFS pool;   nvme disk 2 ZFS dir;    21:34:25; 21:34:49;   24;                   1365.33 MiB/s;      6.5;        22.85;           1433.99 MiB/s;  2.6

> Azure L32s_v2; orig. pbs-restore;   nvme disk 1 ZFS pool;   nvme disk 2 ZFS dir;    21:19:51; 21:22:27;  156;                    210.05 MiB/s;      1.0;        54.46;            601.66 MiB/s;  1.0
> Azure L32s_v2; paral.pbs-restore;   nvme disk 1 ZFS pool;   nvme disk 2 ZFS dir;    21:41:53; 21:42:16;   23;                   1424.70 MiB/s;      6.8;        22.63;           1447.70 MiB/s;  2.4

> Azure L32s_v2; orig. pbs-restore;   nvme disk 1 ext4 dir;   nvme disk 2 ext4 dir;   02:10:04; 02:10:49;   45;                    728.18 MiB/s;      1.0;        43.97;            745.16 MiB/s;  1.0
> Azure L32s_v2; paral.pbs-restore;   nvme disk 1 ext4 dir;   nvme disk 2 ext4 dir;   02:12:35; 02:12:56;   21;                   1560.38 MiB/s;      2.1;        21.06;           1556.12 MiB/s;  2.1

> Azure L32s_v2; orig. pbs-restore;   2x nvme RAID0 ext4 dir; 2x nvme RAID0 ext4 dir; 01:56:35; 01:57:24;   49;                     668.73 MiB/s;     1.0;        48.94;            669.52 MiB/s;  1.0
> Azure L32s_v2; paral.pbs-restore;   2x nvme RAID0 ext4 dir; 2x nvme RAID0 ext4 dir; 22:24:45; 22:25:04;   19;                    1724.63 MiB/s;     2.6;        18.67;           1754.79 MiB/s;  2.6

Now some tests with my 750 GiB VM (zeroes = 5%):

> Results (relative to 750 GiB):
> Host;          Method;              source;                 target;                 Start;    End;      seconds(End - Start);  speed (End-Start);   speedup;  seconds(PVE log); speed(PVE log); speedup

> Azure L32s_v2; orig. pbs-restore;   2x nvme RAID0 ext4 dir; 2x nvme RAID0 ext4 dir; 00:45:19; 01:47:38; 3739;                     205,40 MiB/s;     1.0;      3736.73;          205.53 MiB/s;   1.0
> Azure L32s_v2; paral.pbs-restore;   2x nvme RAID0 ext4 dir; 2x nvme RAID0 ext4 dir; 00:23:44; 00:41:00; 1036;                     741,31 MiB/s;     3.6;      1034.96;          742.06 MiB/s;   3.6


PVE Logs:

> orig. pbs-restore: restore image complete (bytes=34359738368, duration=58.69s, speed=558.31MB/s)
> paral.pbs-restore: restore image complete (bytes=34359738368, duration=22.85s, speed=1433.99MB/s)

> orig. pbs-restore: restore image complete (bytes=34359738368, duration=54.46s, speed=601.66MB/s)
> paral.pbs-restore: restore image complete (bytes=34359738368, duration=22.63s, speed=1447.70MB/s)

> orig. pbs-restore: restore image complete (bytes=34359738368, duration=43.97s, speed=745.16MB/s)
> paral.pbs-restore: restore image complete (bytes=34359738368, duration=21.06s, speed=1556.12MB/s)

> orig. pbs-restore: restore image complete (bytes=34359738368, duration=48.94s, speed=669.52MB/s)
> paral.pbs-restore: restore image complete (bytes=34359738368, duration=18.67s, speed=1754.79MB/s)

> orig. pbs-restore: restore image complete (bytes=805306368000, duration=3736.73s, speed=205.53MB/s)
> paral.pbs-restore: restore image complete (bytes=805306368000, duration=1034.96s, speed=742.06MB/s)

Maybe anyone is interested in "proxmox-backup-client benchmark --repository chunks2tbnvme":

Host | Datastore<br>(tls bench only) | aes256_gcm<br>MB/s | compress<br>MB/s | decompress<br>MB/s | sha256<br>MB/s | tls<br>MB/s | verify<br>MB/s
-- | -- | --: | --: | --: | --: | --: | --:
Azure L32s_v2 | chunks2tbnvme | 2552.57 | 549.95 | 856.72 | 1479.59 | 588.78 | 545.02




^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
  2020-12-07  2:51 Niko Fellner
@ 2020-12-07 13:24 ` Dominik Csapak
  0 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-12-07 13:24 UTC (permalink / raw)
  To: pbs-devel

hi,

i looked a bit more at this today

first, it seems that you are not that familiar with rust and
especially async rust (tokio, etc), please correct me
if i am wrong. it is a complicated topic, so i would suggest that you
make yourself familiar with the concepts, rusts 'reference 
documentation' can be found under

https://doc.rust-lang.org/book/
https://rust-lang.github.io/async-book/

and there are sure many other great resources out there.
ofc you can always ask here too, but it may take longer to explain.


On 12/7/20 3:51 AM, Niko Fellner wrote:
> By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why? >

here a atomicbool/thread::yield are all not necessary (i think it is 
even a hindrance, because you now pause tokio threads), all you need is 
rusts

std::sync::Mutex

the problem afaics, is that qemu cannot have multiple writers on the 
same block backend from multiple threads, and it seems
that is a general qemu limitation (you can even only have
one iothread per image) so i am not sure that it is possible
to implement what you want with a qemu layer

ofc you can ask the qemu people on their mailing list, maybe
i am simply not seeing how

so we have to synchronize on the writes, which makes the whole
patch less interesting since it will not solve your bottleneck
i assume?

the only thing we can parallelize is the download of the chunks,
but in my tests here, that did not improve restore speed at all

i attach my code (which is based on yours) that should be
ok i think,  but please do not use that as i only
tested very shortly... (it is more intended to be
a reference on how one could do such a thing)

i commented the mutex out for now, like this
it will crash with segfaults (like your code does)

with the mutex compiled in, it works but is
as slow as without the patch
(maybe playing around with the threadcount could still make a difference)

i hope this helps :)

------------

diff --git a/src/lib.rs b/src/lib.rs
index b755014..20650b7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -810,6 +810,14 @@ pub extern "C" fn proxmox_restore_image(
      verbose: bool,
  ) -> c_int {

+    #[derive(Clone,Copy)]
+    struct SafePointer {
+        pointer: *mut c_void,
+    }
+
+    unsafe impl Send for SafePointer {};
+    unsafe impl Sync for SafePointer {};
+
      let restore_task = restore_handle_to_task(handle);

      let result: Result<_, Error> = try_block!({
@@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image(
          let archive_name = tools::utf8_c_string(archive_name)?
              .ok_or_else(|| format_err!("archive_name must not be NULL"))?;

+        let foo = SafePointer { pointer: callback_data.clone() };
          let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() 
as u64)
+            callback(foo.pointer, offset, data.as_ptr(), data.len() as u64)
          };

          let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            callback(foo.pointer, offset, std::ptr::null(), len)
          };

          proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..caa5aeb 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -8,6 +8,7 @@ use tokio::runtime::Runtime;
  use tokio::prelude::*;

  use proxmox_backup::tools::runtime::get_runtime_with_builder;
+use proxmox_backup::tools::ParallelHandler;
  use proxmox_backup::backup::*;
  use proxmox_backup::client::{HttpClient, HttpClientOptions, 
BackupReader, RemoteChunkReader};

@@ -66,8 +67,8 @@ impl RestoreTask {
              let mut builder = tokio::runtime::Builder::new();
              builder.threaded_scheduler();
              builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            builder.max_threads(8);
+            builder.core_threads(6);
              builder.thread_name("proxmox-restore-worker");
              builder
          });
@@ -110,8 +111,8 @@ impl RestoreTask {
      pub async fn restore_image(
          &self,
          archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 
'static,
+        write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 
'static,
          verbose: bool,
      ) -> Result<(), Error> {

@@ -148,51 +149,92 @@ impl RestoreTask {
              most_used,
          );

-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
-
          let start_time = std::time::Instant::now();

-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size 
as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, 
&digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
+        let (sender, mut receiver) = 
tokio::sync::mpsc::unbounded_channel();
+
+//        let mutex = Arc::new(Mutex::new(()));
+
+        let index_count = index.index_count();
+        let pool = ParallelHandler::new(
+            "restore", 4,
+            move |(digest, offset, size): ([u8;32], u64, u64)| {
+//                let mutex = mutex.clone();
+                let chunk_reader = chunk_reader.clone();
+                let (bytes, zeroes) = if digest == zero_chunk_digest {
+                    {
+//                        let _guard = mutex.lock().unwrap();
+                        let res = write_zero_callback(offset, size);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                    }
+                    (size, size)
+                } else {
+                    let size = {
+//                        let _guard = mutex.lock().unwrap();
+                        let raw_data = 
ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                        let res = write_data_callback(offset, &raw_data);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        raw_data.len() as u64
+                    };
+                    (size, 0)
+                };
+
+                sender.send((bytes, zeroes))?;
+
+                Ok(())
              }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = 
{}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        );
+
+        let channel = pool.channel();
+
+        let output = tokio::spawn(async move {
+            let mut count = 0;
+            let mut per = 0;
+            let mut bytes = 0;
+            let mut zeroes = 0;
+            while let Some((new_bytes, new_zeroes)) = 
receiver.recv().await {
+                bytes += new_bytes;
+                zeroes += new_zeroes;
+                count += 1;
+                if verbose {
+                    let next_per = ((count)*100)/index_count;
+                    if per != next_per {
+                        eprintln!("progress {}% (read {} bytes, zeroes 
= {}% ({} bytes), duration {} sec)",
+                        next_per, bytes,
+                        zeroes*100/bytes, zeroes,
+                        start_time.elapsed().as_secs());
+                        per = next_per;
+                    }
+                }
+                if count >= index_count {
+                    break;
                  }
              }
-        }

-        let end_time = std::time::Instant::now();
-        let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, 
speed={:.2}MB/s)",
-                  bytes,
-                  elapsed.as_secs_f64(),
-                  bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
-        );
+            let end_time = std::time::Instant::now();
+            let elapsed = end_time.duration_since(start_time);
+            eprintln!("restore image complete (bytes={}, 
duration={:.2}s, speed={:.2}MB/s)",
+                bytes,
+                elapsed.as_secs_f64(),
+                bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
+            );
+
+            Ok::<_, Error>(())
+        });
+
+        for pos in 0..index.index_count() {
+            let digest = index.index_digest(pos).unwrap().clone();
+            let offset = (pos*index.chunk_size) as u64;
+            let chunk_size = index.chunk_size;
+
+            proxmox_backup::tools::runtime::block_in_place(|| 
channel.send((digest, offset, chunk_size as u64)))?;
+        }

-        Ok(())
+        output.await?
      }

      pub fn get_image_length(&self, aid: u8) -> Result<u64, Error> {




^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
@ 2020-12-07  2:51 Niko Fellner
  2020-12-07 13:24 ` Dominik Csapak
  0 siblings, 1 reply; 5+ messages in thread
From: Niko Fellner @ 2020-12-07  2:51 UTC (permalink / raw)
  To: pbs-devel

By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why?

Furthermore I am using "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?" again instead of "AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?"; the latter one sometimes does not return, especially when accessed by more than 1 thread simultaneously.

Some performance tests of a 32 GiB VM (64% zeroes; 10 GB pure random data):
- Original 1 thread sync (bytes=34359738368, duration=224.78s, speed=145.78MB/s)
- When putting a mutex around the read_chunk part, too, I had no performance improvement at all, no matter how many futures and threads I use. So with mutex read_chunk, 1 future was the fastest (bytes=34359738368, zeroes=22322085888, duration=224.75s, speed=145.80MB/s)
- Using 2 futures, 4 core_threads, 6 max_threads: (bytes=34359738368, zeroes=22322085888, duration=188.53s, speed=173.81MB/s)
- Using 3 futures, 4 core_threads, 6 max_threads: (bytes=34359738368, zeroes=22322085888, duration=176.08s, speed=186.10MB/s)
- Using 6 futures, 6 core_threads, 12 max_threads: (bytes=34359738368, zeroes=22322085888, duration=160.12s, speed=204.65MB/s)
- Using 10 futures, 12 core_threads, 24 max_threads: (bytes=34359738368, zeroes=22322085888, duration=154.33s, speed=212.32MB/s)
- Using 12 futures, 12 core_threads, 24 max_threads: (bytes=34359738368, zeroes=22322085888, duration=153.04s, speed=214.11MB/s)

Host:
- my dev+test PVE/PBS runs as a VM with 12 vCPUs within an productive PVE host (which is running on a dedicated host on Hetzner)
- CPU: Intel(R) Xeon(R) CPU E5-1650 v3 (6 core, 12 threads)
- Memory: 256 GB
- Source AND target was at the same SSD "SAMSUNG MZ7LM480HCHP-00003" (for performance reasons I usually choose a different source and target .. but here it's just a test VM)


- Maybe next week I'll test at a bigger Host again (Azure 2x 2TB NVMe with lots of vCPUs...). 
- But still I can't write with more than 1 thread simultaneously...maybe anyone can help out here? (Maybe Dietmar's pull.rs mechanism could help?)


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..b141827 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -66,8 +66,10 @@ impl RestoreTask {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            //builder.max_threads(6);
+            //builder.core_threads(4);
+            builder.max_threads(24);
+            builder.core_threads(12);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -106,12 +108,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,46 +150,135 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        let per = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let bytes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let zeroes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push and await tasks");
+        
+        //let mut tokio_handles = vec![]; // Vec::with_capacity(index_count);
+       
+        use futures::stream::{StreamExt};
+        let mut futs = futures::stream::FuturesUnordered::new();
+        //let mut futs = futures::stream::FuturesOrdered::new(); // not helpful
 
         let start_time = std::time::Instant::now();
-
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
+        
+        // I tried out where we need a lock. Actually only needed for reading.
+        let locked = Arc::new(std::sync::atomic::AtomicBool::new(false)); // false = unlocked = allowed to read
+        let locked_progress = Arc::new(std::sync::atomic::AtomicBool::new(false)); // false = unlocked = allowed to read
+
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            let per = std::sync::Arc::clone(&per);
+            let bytes = std::sync::Arc::clone(&bytes);
+            let zeroes = std::sync::Arc::clone(&zeroes);
+            let locked_clone = Arc::clone(&locked);
+            let locked_progress_clone = Arc::clone(&locked_progress);
+            //tokio_handles.push(
+            futs.push(
+                tokio::spawn(
+                //tokio::task::spawn_blocking(
+                    async move {
+                    //move || {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("{} BEGIN offset {}", pos, offset);
+                        //eprintln!("{} BEGIN", pos);
+                        if digest == &zero_chunk_digest {
+                            while locked_clone.compare_exchange(false, true, std::sync::atomic::Ordering::Acquire, std::sync::atomic::Ordering::Acquire).is_err() {std::thread::yield_now()} // if it's unlocked (false): set to locked (true), else waits
+                            //eprintln!("{} write_zero_callback ENTERED", pos);
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("{} write_zero_callback LEFT with res: {}, offset: {}", pos, res, offset);
+                            locked_clone.store(false, std::sync::atomic::Ordering::Release); // unlock it
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                            zeroes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                        } else {
+                            //eprintln!("{} BEFORE read_chunk: offset: {}", pos, offset);
+                            //while locked_clone.compare_exchange(false, true, std::sync::atomic::Ordering::Acquire, std::sync::atomic::Ordering::Acquire).is_err() {std::thread::yield_now()} // if it's unlocked (false): set to locked (true), else waits
+                            //eprintln!("{} read_chunk ENTERED", pos);
+                            let raw_data = ReadChunk::read_chunk(&chunk_reader_clone, &digest)?; // works fine when we have a mutex around the following writing part
+                            //let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?; // sometimes does not return, especially when accessed by more than 1 thread, no matter if mutex or not
+                            //eprintln!("{} AFTER read_chunk: offset: {}", pos, offset);
+                            //eprintln!("{} read_chunk LEFT", pos);
+                            //locked_clone.store(false, std::sync::atomic::Ordering::Release); // unlock it
+                            
+                            while locked_clone.compare_exchange(false, true, std::sync::atomic::Ordering::Acquire, std::sync::atomic::Ordering::Acquire).is_err() {std::thread::yield_now()} // if it's unlocked (false): set to locked (true), else waits
+                            //eprintln!("{} write_data_callback ENTERED", pos);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("{} AFTER write_data_callback with res: {}, offset: {}", pos, res, offset);
+                            //eprintln!("{} write_data_callback LEFT with res: {}", pos, res);
+                            locked_clone.store(false, std::sync::atomic::Ordering::Release); // unlock it
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(raw_data.len(), std::sync::atomic::Ordering::SeqCst);
+                        }
+                        if verbose {
+                            // mutex only helps for avoiding to show the same line twice by different threads.
+                            // without mutex we have no crashes either.
+                            // the mutex here doesn't cost any measurable performance
+                            while locked_progress_clone.compare_exchange(false, true, std::sync::atomic::Ordering::Acquire, std::sync::atomic::Ordering::Acquire).is_err() {std::thread::yield_now()} // if it's unlocked (false): set to locked (true), else waits
+                            //eprintln!("{} progress ENTERED", pos);
+                            let next_per = ((pos+1)*100)/index_count;
+                            let currPer = per.load(std::sync::atomic::Ordering::SeqCst);
+                            let currBytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+                            let currZeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
+                            //if per != next_per {
+                            if currPer < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, currBytes,
+                                      currZeroes*100/currBytes, currZeroes,
+                                      start_time.elapsed().as_secs());
+                                per.store(next_per, std::sync::atomic::Ordering::SeqCst);
+                            }
+                            //eprintln!("{} progress LEFT", pos);
+                            locked_progress_clone.store(false, std::sync::atomic::Ordering::Release); // unlock it
+                        }
+                        //eprintln!("{} END", pos);
+                        Ok(())
+                    }
+                )
+            );
+            
+            //if futs.len() >= 2 { 
+            if futs.len() >= 12 { // no performance advantage to use values higher than 1 here, if we don't read with >= 2 threads
+                while let Some(response) = futs.next().await {
+                    //eprintln!("Emptying queue. Current length: {}", futs.len());
+                    if let Err(e) = response {
+                        eprintln!("Error during await: {}", e);
+                    }
                 }
-                bytes += raw_data.len();
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await remaining");
+        // Wait for the remaining to finish.
+        while let Some(response) = futs.next().await {
+            eprintln!("Emptying queue. Current length: {}", futs.len());
+            if let Err(e) = response {
+                eprintln!("Error during await: {}", e);
             }
         }
-
+        //if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+        //    eprintln!("Error during await: {}", e);
+        //}
+        eprintln!("END: await remaining");
+        
+        let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+        let zeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+        eprintln!("restore image complete (bytes={}, zeroes={}, duration={:.2}s, speed={:.2}MB/s)",
                   bytes,
+                  zeroes,
                   elapsed.as_secs_f64(),
                   bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
         );




Full Log of original restore:
new volume ID is 'local-zfs:vm-100-disk-0'
restore proxmox backup image: /usr/bin/pbs-restore --repository root@pam@localhost:datastore vm/100/2020-12-01T12:34:43Z drive-scsi0.img.fidx /dev/zvol/rpool/data/vm-100-disk-0 --verbose --format raw --skip-zero
connecting to repository 'root@pam@localhost:datastore'
open block backend for target '/dev/zvol/rpool/data/vm-100-disk-0'
starting to restore snapshot 'vm/100/2020-12-01T12:34:43Z'
download and verify backup index
progress 1% (read 343932928 bytes, zeroes = 35% (121634816 bytes), duration 2 sec)
progress 2% (read 687865856 bytes, zeroes = 17% (121634816 bytes), duration 5 sec)
progress 3% (read 1031798784 bytes, zeroes = 12% (125829120 bytes), duration 11 sec)
progress 4% (read 1375731712 bytes, zeroes = 9% (130023424 bytes), duration 18 sec)
progress 5% (read 1719664640 bytes, zeroes = 7% (130023424 bytes), duration 24 sec)
progress 6% (read 2063597568 bytes, zeroes = 6% (130023424 bytes), duration 31 sec)
progress 7% (read 2407530496 bytes, zeroes = 10% (255852544 bytes), duration 36 sec)
progress 8% (read 2751463424 bytes, zeroes = 9% (255852544 bytes), duration 42 sec)
progress 9% (read 3095396352 bytes, zeroes = 8% (255852544 bytes), duration 49 sec)
progress 10% (read 3439329280 bytes, zeroes = 7% (260046848 bytes), duration 55 sec)
progress 11% (read 3783262208 bytes, zeroes = 6% (264241152 bytes), duration 62 sec)
progress 12% (read 4127195136 bytes, zeroes = 6% (264241152 bytes), duration 69 sec)
progress 13% (read 4466933760 bytes, zeroes = 8% (390070272 bytes), duration 73 sec)
progress 14% (read 4810866688 bytes, zeroes = 8% (390070272 bytes), duration 80 sec)
progress 15% (read 5154799616 bytes, zeroes = 7% (390070272 bytes), duration 86 sec)
progress 16% (read 5498732544 bytes, zeroes = 7% (390070272 bytes), duration 93 sec)
progress 17% (read 5842665472 bytes, zeroes = 6% (390070272 bytes), duration 99 sec)
progress 18% (read 6186598400 bytes, zeroes = 6% (390070272 bytes), duration 106 sec)
progress 19% (read 6530531328 bytes, zeroes = 7% (469762048 bytes), duration 112 sec)
progress 20% (read 6874464256 bytes, zeroes = 7% (515899392 bytes), duration 117 sec)
progress 21% (read 7218397184 bytes, zeroes = 7% (515899392 bytes), duration 124 sec)
progress 22% (read 7562330112 bytes, zeroes = 6% (515899392 bytes), duration 130 sec)
progress 23% (read 7906263040 bytes, zeroes = 6% (515899392 bytes), duration 137 sec)
progress 24% (read 8250195968 bytes, zeroes = 6% (515899392 bytes), duration 144 sec)
progress 25% (read 8589934592 bytes, zeroes = 6% (515899392 bytes), duration 150 sec)
progress 26% (read 8933867520 bytes, zeroes = 7% (641728512 bytes), duration 155 sec)
progress 27% (read 9277800448 bytes, zeroes = 6% (641728512 bytes), duration 161 sec)
progress 28% (read 9621733376 bytes, zeroes = 6% (641728512 bytes), duration 168 sec)
progress 29% (read 9965666304 bytes, zeroes = 6% (641728512 bytes), duration 175 sec)
progress 30% (read 10309599232 bytes, zeroes = 6% (641728512 bytes), duration 182 sec)
progress 31% (read 10653532160 bytes, zeroes = 6% (641728512 bytes), duration 189 sec)
progress 32% (read 10997465088 bytes, zeroes = 6% (767557632 bytes), duration 192 sec)
progress 33% (read 11341398016 bytes, zeroes = 6% (767557632 bytes), duration 200 sec)
progress 34% (read 11685330944 bytes, zeroes = 6% (767557632 bytes), duration 206 sec)
progress 35% (read 12029263872 bytes, zeroes = 6% (767557632 bytes), duration 213 sec)
progress 36% (read 12373196800 bytes, zeroes = 6% (767557632 bytes), duration 220 sec)
progress 37% (read 12717129728 bytes, zeroes = 8% (1098907648 bytes), duration 220 sec)
progress 38% (read 13056868352 bytes, zeroes = 10% (1426063360 bytes), duration 220 sec)
progress 39% (read 13400801280 bytes, zeroes = 13% (1769996288 bytes), duration 220 sec)
progress 40% (read 13744734208 bytes, zeroes = 15% (2113929216 bytes), duration 220 sec)
progress 41% (read 14088667136 bytes, zeroes = 17% (2457862144 bytes), duration 220 sec)
progress 42% (read 14432600064 bytes, zeroes = 19% (2801795072 bytes), duration 220 sec)
progress 43% (read 14776532992 bytes, zeroes = 21% (3145728000 bytes), duration 220 sec)
progress 44% (read 15120465920 bytes, zeroes = 23% (3485466624 bytes), duration 220 sec)
progress 45% (read 15464398848 bytes, zeroes = 23% (3690987520 bytes), duration 221 sec)
progress 46% (read 15808331776 bytes, zeroes = 25% (4034920448 bytes), duration 221 sec)
progress 47% (read 16152264704 bytes, zeroes = 27% (4378853376 bytes), duration 221 sec)
progress 48% (read 16496197632 bytes, zeroes = 28% (4722786304 bytes), duration 221 sec)
progress 49% (read 16840130560 bytes, zeroes = 30% (5062524928 bytes), duration 221 sec)
progress 50% (read 17179869184 bytes, zeroes = 31% (5402263552 bytes), duration 221 sec)
progress 51% (read 17523802112 bytes, zeroes = 32% (5725224960 bytes), duration 221 sec)
progress 52% (read 17867735040 bytes, zeroes = 33% (6069157888 bytes), duration 221 sec)
progress 53% (read 18211667968 bytes, zeroes = 35% (6413090816 bytes), duration 221 sec)
progress 54% (read 18555600896 bytes, zeroes = 36% (6757023744 bytes), duration 221 sec)
progress 55% (read 18899533824 bytes, zeroes = 37% (7100956672 bytes), duration 221 sec)
progress 56% (read 19243466752 bytes, zeroes = 38% (7444889600 bytes), duration 221 sec)
progress 57% (read 19587399680 bytes, zeroes = 39% (7776239616 bytes), duration 222 sec)
progress 58% (read 19931332608 bytes, zeroes = 40% (8120172544 bytes), duration 222 sec)
progress 59% (read 20275265536 bytes, zeroes = 41% (8464105472 bytes), duration 222 sec)
progress 60% (read 20619198464 bytes, zeroes = 42% (8808038400 bytes), duration 222 sec)
progress 61% (read 20963131392 bytes, zeroes = 43% (9151971328 bytes), duration 222 sec)
progress 62% (read 21307064320 bytes, zeroes = 44% (9495904256 bytes), duration 222 sec)
progress 63% (read 21646802944 bytes, zeroes = 45% (9789505536 bytes), duration 223 sec)
progress 64% (read 21990735872 bytes, zeroes = 45% (10003415040 bytes), duration 224 sec)
progress 65% (read 22334668800 bytes, zeroes = 46% (10347347968 bytes), duration 224 sec)
progress 66% (read 22678601728 bytes, zeroes = 47% (10691280896 bytes), duration 224 sec)
progress 67% (read 23022534656 bytes, zeroes = 47% (11035213824 bytes), duration 224 sec)
progress 68% (read 23366467584 bytes, zeroes = 48% (11379146752 bytes), duration 224 sec)
progress 69% (read 23710400512 bytes, zeroes = 49% (11714691072 bytes), duration 224 sec)
progress 70% (read 24054333440 bytes, zeroes = 50% (12050235392 bytes), duration 224 sec)
progress 71% (read 24398266368 bytes, zeroes = 50% (12394168320 bytes), duration 224 sec)
progress 72% (read 24742199296 bytes, zeroes = 51% (12738101248 bytes), duration 224 sec)
progress 73% (read 25086132224 bytes, zeroes = 52% (13082034176 bytes), duration 224 sec)
progress 74% (read 25430065152 bytes, zeroes = 52% (13425967104 bytes), duration 224 sec)
progress 75% (read 25769803776 bytes, zeroes = 53% (13765705728 bytes), duration 224 sec)
progress 76% (read 26113736704 bytes, zeroes = 53% (14101250048 bytes), duration 224 sec)
progress 77% (read 26457669632 bytes, zeroes = 54% (14445182976 bytes), duration 224 sec)
progress 78% (read 26801602560 bytes, zeroes = 55% (14789115904 bytes), duration 224 sec)
progress 79% (read 27145535488 bytes, zeroes = 55% (15133048832 bytes), duration 224 sec)
progress 80% (read 27489468416 bytes, zeroes = 56% (15476981760 bytes), duration 224 sec)
progress 81% (read 27833401344 bytes, zeroes = 56% (15820914688 bytes), duration 224 sec)
progress 82% (read 28177334272 bytes, zeroes = 57% (16156459008 bytes), duration 224 sec)
progress 83% (read 28521267200 bytes, zeroes = 57% (16500391936 bytes), duration 224 sec)
progress 84% (read 28865200128 bytes, zeroes = 58% (16844324864 bytes), duration 224 sec)
progress 85% (read 29209133056 bytes, zeroes = 58% (17188257792 bytes), duration 224 sec)
progress 86% (read 29553065984 bytes, zeroes = 59% (17532190720 bytes), duration 224 sec)
progress 87% (read 29896998912 bytes, zeroes = 59% (17876123648 bytes), duration 224 sec)
progress 88% (read 30236737536 bytes, zeroes = 60% (18203279360 bytes), duration 224 sec)
progress 89% (read 30580670464 bytes, zeroes = 60% (18547212288 bytes), duration 224 sec)
progress 90% (read 30924603392 bytes, zeroes = 61% (18891145216 bytes), duration 224 sec)
progress 91% (read 31268536320 bytes, zeroes = 61% (19235078144 bytes), duration 224 sec)
progress 92% (read 31612469248 bytes, zeroes = 61% (19579011072 bytes), duration 224 sec)
progress 93% (read 31956402176 bytes, zeroes = 62% (19922944000 bytes), duration 224 sec)
progress 94% (read 32300335104 bytes, zeroes = 62% (20262682624 bytes), duration 224 sec)
progress 95% (read 32644268032 bytes, zeroes = 63% (20606615552 bytes), duration 224 sec)
progress 96% (read 32988200960 bytes, zeroes = 63% (20950548480 bytes), duration 224 sec)
progress 97% (read 33332133888 bytes, zeroes = 63% (21294481408 bytes), duration 224 sec)
progress 98% (read 33676066816 bytes, zeroes = 64% (21638414336 bytes), duration 224 sec)
progress 99% (read 34019999744 bytes, zeroes = 64% (21982347264 bytes), duration 224 sec)
progress 100% (read 34359738368 bytes, zeroes = 64% (22322085888 bytes), duration 224 sec)
restore image complete (bytes=34359738368, duration=224.78s, speed=145.78MB/s)
rescan volumes...
TASK OK


Full log of restore Using 12 futures, 12 core_threads, 24 max_threads:
new volume ID is 'local-zfs:vm-100-disk-0'
restore proxmox backup image: /usr/bin/pbs-restore --repository root@pam@localhost:datastore vm/100/2020-12-01T12:34:43Z drive-scsi0.img.fidx /dev/zvol/rpool/data/vm-100-disk-0 --verbose --format raw --skip-zero
connecting to repository 'root@pam@localhost:datastore'
open block backend for target '/dev/zvol/rpool/data/vm-100-disk-0'
starting to restore snapshot 'vm/100/2020-12-01T12:34:43Z'
download and verify backup index
index_count = 8192, index_chunk_size: 4194304
BEGIN: push and await tasks
progress 1% (read 327155712 bytes, zeroes = 37% (121634816 bytes), duration 0 sec)
progress 2% (read 675282944 bytes, zeroes = 18% (121634816 bytes), duration 2 sec)
progress 3% (read 1010827264 bytes, zeroes = 12% (125829120 bytes), duration 6 sec)
progress 4% (read 1367343104 bytes, zeroes = 9% (130023424 bytes), duration 11 sec)
progress 5% (read 1715470336 bytes, zeroes = 7% (130023424 bytes), duration 15 sec)
progress 6% (read 2038431744 bytes, zeroes = 6% (130023424 bytes), duration 20 sec)
progress 7% (read 2382364672 bytes, zeroes = 10% (255852544 bytes), duration 23 sec)
progress 8% (read 2726297600 bytes, zeroes = 9% (255852544 bytes), duration 28 sec)
progress 9% (read 3074424832 bytes, zeroes = 8% (255852544 bytes), duration 31 sec)
progress 10% (read 3430940672 bytes, zeroes = 7% (260046848 bytes), duration 36 sec)
progress 11% (read 3779067904 bytes, zeroes = 6% (264241152 bytes), duration 41 sec)
progress 12% (read 4097835008 bytes, zeroes = 6% (264241152 bytes), duration 45 sec)
progress 13% (read 4462739456 bytes, zeroes = 8% (390070272 bytes), duration 47 sec)
progress 14% (read 4785700864 bytes, zeroes = 8% (390070272 bytes), duration 52 sec)
progress 15% (read 5154799616 bytes, zeroes = 7% (390070272 bytes), duration 56 sec)
progress 16% (read 5494538240 bytes, zeroes = 7% (390070272 bytes), duration 61 sec)
progress 17% (read 5842665472 bytes, zeroes = 6% (390070272 bytes), duration 65 sec)
progress 18% (read 6161432576 bytes, zeroes = 6% (390070272 bytes), duration 69 sec)
progress 19% (read 6530531328 bytes, zeroes = 7% (469762048 bytes), duration 73 sec)
progress 20% (read 6853492736 bytes, zeroes = 7% (515899392 bytes), duration 76 sec)
progress 21% (read 7205814272 bytes, zeroes = 7% (515899392 bytes), duration 81 sec)
progress 22% (read 7558135808 bytes, zeroes = 6% (515899392 bytes), duration 86 sec)
progress 23% (read 7906263040 bytes, zeroes = 6% (515899392 bytes), duration 90 sec)
progress 24% (read 8237613056 bytes, zeroes = 6% (515899392 bytes), duration 95 sec)
progress 25% (read 8560574464 bytes, zeroes = 6% (520093696 bytes), duration 99 sec)
progress 26% (read 8917090304 bytes, zeroes = 7% (641728512 bytes), duration 101 sec)
progress 27% (read 9265217536 bytes, zeroes = 6% (641728512 bytes), duration 106 sec)
progress 28% (read 9617539072 bytes, zeroes = 6% (641728512 bytes), duration 110 sec)
progress 29% (read 9940500480 bytes, zeroes = 6% (641728512 bytes), duration 115 sec)
progress 30% (read 10280239104 bytes, zeroes = 6% (641728512 bytes), duration 119 sec)
progress 31% (read 10624172032 bytes, zeroes = 6% (641728512 bytes), duration 124 sec)
progress 32% (read 10976493568 bytes, zeroes = 6% (767557632 bytes), duration 127 sec)
progress 33% (read 11333009408 bytes, zeroes = 6% (767557632 bytes), duration 132 sec)
progress 34% (read 11681136640 bytes, zeroes = 6% (767557632 bytes), duration 136 sec)
progress 35% (read 12012486656 bytes, zeroes = 6% (767557632 bytes), duration 141 sec)
progress 36% (read 12356419584 bytes, zeroes = 6% (767557632 bytes), duration 145 sec)
progress 37% (read 12717129728 bytes, zeroes = 8% (1098907648 bytes), duration 145 sec)
progress 38% (read 13056868352 bytes, zeroes = 10% (1426063360 bytes), duration 145 sec)
progress 39% (read 13400801280 bytes, zeroes = 13% (1769996288 bytes), duration 145 sec)
progress 40% (read 13744734208 bytes, zeroes = 15% (2113929216 bytes), duration 145 sec)
progress 41% (read 14088667136 bytes, zeroes = 17% (2457862144 bytes), duration 145 sec)
progress 42% (read 14432600064 bytes, zeroes = 19% (2801795072 bytes), duration 145 sec)
progress 43% (read 14772338688 bytes, zeroes = 21% (3141533696 bytes), duration 145 sec)
progress 44% (read 15120465920 bytes, zeroes = 23% (3485466624 bytes), duration 145 sec)
progress 45% (read 15464398848 bytes, zeroes = 23% (3690987520 bytes), duration 146 sec)
progress 46% (read 15808331776 bytes, zeroes = 25% (4034920448 bytes), duration 146 sec)
progress 47% (read 16148070400 bytes, zeroes = 27% (4374659072 bytes), duration 146 sec)
progress 48% (read 16496197632 bytes, zeroes = 28% (4722786304 bytes), duration 146 sec)
progress 49% (read 16835936256 bytes, zeroes = 30% (5058330624 bytes), duration 146 sec)
progress 50% (read 17175674880 bytes, zeroes = 31% (5398069248 bytes), duration 146 sec)
progress 51% (read 17523802112 bytes, zeroes = 32% (5725224960 bytes), duration 146 sec)
progress 52% (read 17867735040 bytes, zeroes = 33% (6069157888 bytes), duration 146 sec)
progress 53% (read 18186502144 bytes, zeroes = 35% (6387924992 bytes), duration 147 sec)
progress 54% (read 18555600896 bytes, zeroes = 36% (6757023744 bytes), duration 147 sec)
progress 55% (read 18899533824 bytes, zeroes = 37% (7100956672 bytes), duration 147 sec)
progress 56% (read 19243466752 bytes, zeroes = 38% (7444889600 bytes), duration 147 sec)
progress 57% (read 19587399680 bytes, zeroes = 39% (7776239616 bytes), duration 147 sec)
progress 58% (read 19931332608 bytes, zeroes = 40% (8120172544 bytes), duration 147 sec)
progress 59% (read 20275265536 bytes, zeroes = 41% (8464105472 bytes), duration 147 sec)
progress 60% (read 20619198464 bytes, zeroes = 42% (8808038400 bytes), duration 147 sec)
progress 61% (read 20963131392 bytes, zeroes = 43% (9151971328 bytes), duration 147 sec)
progress 62% (read 21307064320 bytes, zeroes = 44% (9495904256 bytes), duration 147 sec)
progress 63% (read 21646802944 bytes, zeroes = 45% (9789505536 bytes), duration 147 sec)
progress 64% (read 21990735872 bytes, zeroes = 45% (10003415040 bytes), duration 149 sec)
progress 65% (read 22334668800 bytes, zeroes = 46% (10347347968 bytes), duration 149 sec)
progress 66% (read 22678601728 bytes, zeroes = 47% (10691280896 bytes), duration 149 sec)
progress 67% (read 23022534656 bytes, zeroes = 47% (11035213824 bytes), duration 149 sec)
progress 68% (read 23366467584 bytes, zeroes = 48% (11379146752 bytes), duration 149 sec)
progress 69% (read 23710400512 bytes, zeroes = 49% (11714691072 bytes), duration 149 sec)
progress 70% (read 24050139136 bytes, zeroes = 50% (12046041088 bytes), duration 149 sec)
progress 71% (read 24398266368 bytes, zeroes = 50% (12394168320 bytes), duration 149 sec)
progress 72% (read 24742199296 bytes, zeroes = 51% (12738101248 bytes), duration 149 sec)
progress 73% (read 25086132224 bytes, zeroes = 52% (13082034176 bytes), duration 149 sec)
progress 74% (read 25430065152 bytes, zeroes = 52% (13425967104 bytes), duration 149 sec)
progress 75% (read 25769803776 bytes, zeroes = 53% (13765705728 bytes), duration 149 sec)
progress 76% (read 26113736704 bytes, zeroes = 53% (14101250048 bytes), duration 149 sec)
progress 77% (read 26457669632 bytes, zeroes = 54% (14445182976 bytes), duration 149 sec)
progress 78% (read 26797408256 bytes, zeroes = 55% (14784921600 bytes), duration 149 sec)
progress 79% (read 27145535488 bytes, zeroes = 55% (15133048832 bytes), duration 149 sec)
progress 80% (read 27489468416 bytes, zeroes = 56% (15476981760 bytes), duration 150 sec)
progress 81% (read 27804041216 bytes, zeroes = 56% (15791554560 bytes), duration 150 sec)
progress 82% (read 28177334272 bytes, zeroes = 57% (16156459008 bytes), duration 150 sec)
progress 83% (read 28504489984 bytes, zeroes = 57% (16483614720 bytes), duration 150 sec)
progress 84% (read 28865200128 bytes, zeroes = 58% (16844324864 bytes), duration 150 sec)
progress 85% (read 29209133056 bytes, zeroes = 58% (17188257792 bytes), duration 150 sec)
progress 86% (read 29553065984 bytes, zeroes = 59% (17532190720 bytes), duration 150 sec)
progress 87% (read 29876027392 bytes, zeroes = 59% (17855152128 bytes), duration 151 sec)
progress 88% (read 30232543232 bytes, zeroes = 60% (18203279360 bytes), duration 151 sec)
progress 89% (read 30576476160 bytes, zeroes = 60% (18543017984 bytes), duration 151 sec)
progress 90% (read 30924603392 bytes, zeroes = 61% (18891145216 bytes), duration 151 sec)
progress 91% (read 31264342016 bytes, zeroes = 61% (19230883840 bytes), duration 152 sec)
progress 92% (read 31612469248 bytes, zeroes = 61% (19579011072 bytes), duration 152 sec)
progress 93% (read 31956402176 bytes, zeroes = 62% (19922944000 bytes), duration 152 sec)
progress 94% (read 32296140800 bytes, zeroes = 62% (20258488320 bytes), duration 152 sec)
progress 95% (read 32644268032 bytes, zeroes = 63% (20606615552 bytes), duration 152 sec)
progress 96% (read 32988200960 bytes, zeroes = 63% (20950548480 bytes), duration 152 sec)
progress 97% (read 33332133888 bytes, zeroes = 63% (21294481408 bytes), duration 152 sec)
progress 98% (read 33676066816 bytes, zeroes = 64% (21638414336 bytes), duration 152 sec)
progress 99% (read 34003222528 bytes, zeroes = 64% (21965570048 bytes), duration 152 sec)
END: push tasks
BEGIN: await remaining
Emptying queue. Current length: 7
Emptying queue. Current length: 6
Emptying queue. Current length: 5
Emptying queue. Current length: 4
Emptying queue. Current length: 3
Emptying queue. Current length: 2
progress 100% (read 34359738368 bytes, zeroes = 64% (22322085888 bytes), duration 153 sec)
Emptying queue. Current length: 1
Emptying queue. Current length: 0
END: await remaining
restore image complete (bytes=34359738368, zeroes=22322085888, duration=153.04s, speed=214.11MB/s)
rescan volumes...
TASK OK



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
@ 2020-12-06  2:51 Niko Fellner
  0 siblings, 0 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-06  2:51 UTC (permalink / raw)
  To: pbs-devel

Another update: 
- Now working with the await.unwrap of future pairs of 2
- Using atomics to make counting of percentage, bytes, zeroes work
- Sometimes the program runs and finishes OK.. first impression: performance looks good (about 230 vs 380 seconds (sync) for 32 GiB VM, but can't really measure performance now, server is busy)
- But sometimes the program segfaults... Not sure why? Maybe anyone has an idea?
- The more futs I await.unwrap in parallel (see "if futs.len() >= N ..."), the faster/more probable the segfaults occur. 

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..f7aa564 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -66,8 +66,10 @@ impl RestoreTask {
             let mut builder = tokio::runtime::Builder::new();
             builder.threaded_scheduler();
             builder.enable_all();
-            builder.max_threads(6);
-            builder.core_threads(4);
+            //builder.max_threads(6);
+            //builder.core_threads(4);
+            builder.max_threads(12);
+            builder.core_threads(6);
             builder.thread_name("proxmox-restore-worker");
             builder
         });
@@ -106,12 +108,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,46 +150,108 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
-        let mut bytes = 0;
-        let mut zeroes = 0;
+        let per = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let bytes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let zeroes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        
+        //let mut tokio_handles = vec![];
+        //let mut futs = futures::stream::FuturesUnordered::new();
+        //use futures::stream::{self, StreamExt, TryStreamExt};
+        use futures::stream::{StreamExt};
+        //let futs = tokio::stream::iter;
+        let mut futs = futures::stream::FuturesUnordered::new();
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push and await tasks");
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            let per = std::sync::Arc::clone(&per);
+            let bytes = std::sync::Arc::clone(&bytes);
+            let zeroes = std::sync::Arc::clone(&zeroes);
+            futs.push(
+                tokio::spawn(
+                    async move {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("pos: {}, offset: {}", pos, offset);
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                            zeroes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+                        } else {
+                            //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                            //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                            let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                            //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes.fetch_add(raw_data.len(), std::sync::atomic::Ordering::SeqCst);
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index_count;
+                            let currPer = per.load(std::sync::atomic::Ordering::SeqCst);
+                            let currBytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+                            let currZeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
+                            //if per != next_per {
+                            if currPer < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, currBytes,
+                                      currZeroes*100/currBytes, currZeroes,
+                                      start_time.elapsed().as_secs());
+                                per.store(next_per, std::sync::atomic::Ordering::SeqCst);
+                            }
+                        }
+                        Ok(())
+                    }
+                )
+            );
+            
+            //if futs.len() >= 2 {
+            if futs.len() >= 2 {
+                let response = futs.next().await.unwrap();
+                if let Err(e) = response {
+                    eprintln!("Error during await: {}", e);
                 }
-                bytes += raw_data.len();
             }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await remaining");
+        // Wait for the remaining to finish.
+        while let Some(response) = futs.next().await {
+            if let Err(e) = response {
+                eprintln!("Error during await: {}", e);
             }
         }
+        eprintln!("END: await remaining");
+
+        //futs.try_buffer_unordered(20)
+        //.try_for_each(|_res| futures::future::ok(()))
+        //.await?;
+        //if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+        //    eprintln!("Error during await: {}", e);
+        //}
+        let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+        let zeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);
-        eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+        eprintln!("restore image complete (bytes={}, zeroes={}, duration={:.2}s, speed={:.2}MB/s)",
                   bytes,
+                  zeroes,
                   elapsed.as_secs_f64(),
                   bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
         );



^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-12-09  2:07 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-07 22:59 [pbs-devel] parallelize restore.rs fn restore_image: problems in Niko Fellner
  -- strict thread matches above, loose matches on Subject: below --
2020-12-09  2:07 Niko Fellner
2020-12-07  2:51 Niko Fellner
2020-12-07 13:24 ` Dominik Csapak
2020-12-06  2:51 Niko Fellner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal