public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
@ 2020-12-04 14:14 Niko Fellner
  2020-12-04 14:59 ` Dominik Csapak
  2020-12-04 15:05 ` Dietmar Maurer
  0 siblings, 2 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-04 14:14 UTC (permalink / raw)
  To: pbs-devel

In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:

let mut my_handles = vec![];
for pos in 0..100 {
    my_handles.push(
        tokio::spawn(
            async move {
                println!("Task: {}", pos)
            }
        )
    );
}
futures::future::join_all(my_handles).await;

This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:

cargo build --release
   Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
error[E0308]: mismatched types
   --> src/restore.rs:181:48
    |
181 |   ...                   if per != next_per {
    |  __________________________________________^
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:181:29
    |
181 | / ...                   if per != next_per {
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:179:25
    |
179 | /                         if verbose {
180 | |                             let next_per = ((pos+1)*100)/index.index_count();
181 | |                             if per != next_per {
182 | |                                 eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
...   |
187 | |                             }
188 | |                         }
    | |_________________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error: aborting due to 3 previous errors

For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-backup-qemu`.

To learn more, run the command again with --verbose.
make: *** [Makefile:22: all] Fehler 101



Do you have any clue how to fix this?
I guess I am doing rookie mistakes.

Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not? 
The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..4d1df9c 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -151,38 +151,46 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        let mut my_handles = vec![];
 
         let start_time = std::time::Instant::now();
 
         for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+            my_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = index.index_digest(pos).unwrap();
+                        let offset = (pos*index.chunk_size) as u64;
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index.chunk_size as u64);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index.chunk_size;
+                            zeroes += index.chunk_size;
+                        } else {
+                            let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                            let res = write_data_callback(offset, &raw_data);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index.index_count();
+                            if per != next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                          next_per, bytes,
+                                          zeroes*100/bytes, zeroes,
+                                          start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                    }
+                )
+            );
         }
+        futures::future::join_all(my_handles).await;
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
  2020-12-04 14:14 [pbs-devel] parallelize restore.rs fn restore_image: problems in async move Niko Fellner
@ 2020-12-04 14:59 ` Dominik Csapak
  2020-12-05  1:11   ` Niko Fellner
  2020-12-04 15:05 ` Dietmar Maurer
  1 sibling, 1 reply; 5+ messages in thread
From: Dominik Csapak @ 2020-12-04 14:59 UTC (permalink / raw)
  To: pbs-devel

hi,

i did not look very deeply into it but i saw a few things, comments inline

On 12/4/20 3:14 PM, Niko Fellner wrote:
> In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:
> 
> let mut my_handles = vec![];
> for pos in 0..100 {
>      my_handles.push(
>          tokio::spawn(
>              async move {
>                  println!("Task: {}", pos)
>              }
>          )
>      );
> }
> futures::future::join_all(my_handles).await;
> 
> This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:
> 
> cargo build --release
>     Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
> error[E0308]: mismatched types
>     --> src/restore.rs:181:48
>      |
> 181 |   ...                   if per != next_per {
>      |  __________________________________________^
> 182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
> 183 | | ...                                 next_per, bytes,
> 184 | | ...                                 zeroes*100/bytes, zeroes,
> 185 | | ...                                 start_time.elapsed().as_secs());
> 186 | | ...                       per = next_per;
> 187 | | ...                   }
>      | |_______________________^ expected enum `std::result::Result`, found `()`
>      |
>      = note:   expected enum `std::result::Result<_, anyhow::Error>`
>              found unit type `()`
> 
> error[E0308]: mismatched types
>     --> src/restore.rs:181:29
>      |
> 181 | / ...                   if per != next_per {
> 182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
> 183 | | ...                                 next_per, bytes,
> 184 | | ...                                 zeroes*100/bytes, zeroes,
> 185 | | ...                                 start_time.elapsed().as_secs());
> 186 | | ...                       per = next_per;
> 187 | | ...                   }
>      | |_______________________^ expected enum `std::result::Result`, found `()`
>      |
>      = note:   expected enum `std::result::Result<_, anyhow::Error>`
>              found unit type `()`
> 
> error[E0308]: mismatched types
>     --> src/restore.rs:179:25
>      |
> 179 | /                         if verbose {
> 180 | |                             let next_per = ((pos+1)*100)/index.index_count();
> 181 | |                             if per != next_per {
> 182 | |                                 eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
> ...   |
> 187 | |                             }
> 188 | |                         }
>      | |_________________________^ expected enum `std::result::Result`, found `()`
>      |
>      = note:   expected enum `std::result::Result<_, anyhow::Error>`
>              found unit type `()`
> 
> error: aborting due to 3 previous errors
> 
> For more information about this error, try `rustc --explain E0308`.
> error: could not compile `proxmox-backup-qemu`.
> 
> To learn more, run the command again with --verbose.
> make: *** [Makefile:22: all] Fehler 101
> 
> 
> 
> Do you have any clue how to fix this?
> I guess I am doing rookie mistakes.

first thing: the error you get is simply a return type mismatch,
the 'async move' block expects to return a Result<(), Error> because
there is a bail!() and multiple '?' calls.

the simplest way to fix this is to write

Ok(())

at the end of the async move block

the next problem you will run into is that
there are multiple uses of variables that get
moved in a loop (which does not work in rust)
so you have to pull out some things, and clone some others

a much bigger problem is the following:

the callbacks that get called (write_data_callback/write_zero_callback)
are not 'Send' meaning they cannot send between threads safely (needed 
by tokio::spawn)
because they both are closures that use a '*mut T'

in that code this would not actually be a problem, since the restored
blocks never overlap, but rust does not know that

without digging further, i think this is not easily solvable in rust
without either writing unsafe code or changing our c <-> rust api
(idk if that is possible)

> 
> Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not?
> The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...
> 
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 7f29d0a..c87bf5a 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -27,9 +27,9 @@ lazy_static = "1.4"
>   libc = "0.2"
>   once_cell = "1.3.1"
>   openssl = "0.10"
> -proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
> -proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
> -#proxmox-backup = { path = "../proxmox-backup" }
> +proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
> +#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
> +proxmox-backup = { path = "../proxmox-backup" }
>   serde_json = "1.0"
>   tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
>   bincode = "1.0"
> diff --git a/src/restore.rs b/src/restore.rs
> index 24983dd..4d1df9c 100644
> --- a/src/restore.rs
> +++ b/src/restore.rs
> @@ -151,38 +151,46 @@ impl RestoreTask {
>           let mut per = 0;
>           let mut bytes = 0;
>           let mut zeroes = 0;
> +        let mut my_handles = vec![];
>   
>           let start_time = std::time::Instant::now();
>   
>           for pos in 0..index.index_count() {
> -            let digest = index.index_digest(pos).unwrap();
> -            let offset = (pos*index.chunk_size) as u64;
> -            if digest == &zero_chunk_digest {
> -                let res = write_zero_callback(offset, index.chunk_size as u64);
> -                if res < 0 {
> -                    bail!("write_zero_callback failed ({})", res);
> -                }
> -                bytes += index.chunk_size;
> -                zeroes += index.chunk_size;
> -            } else {
> -                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
> -                let res = write_data_callback(offset, &raw_data);
> -                if res < 0 {
> -                    bail!("write_data_callback failed ({})", res);
> -                }
> -                bytes += raw_data.len();
> -            }
> -            if verbose {
> -                let next_per = ((pos+1)*100)/index.index_count();
> -                if per != next_per {
> -                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
> -                              next_per, bytes,
> -                              zeroes*100/bytes, zeroes,
> -                              start_time.elapsed().as_secs());
> -                    per = next_per;
> -                }
> -            }
> +            my_handles.push(
> +                tokio::spawn(
> +                    async move {
> +                        let digest = index.index_digest(pos).unwrap();

this moves 'index' into the async move block, but since
that code is called every loop iteration it does not work

best would be to calculate these things outside the async move block
before the 'my_handles.push' and just use the result

> +                        let offset = (pos*index.chunk_size) as u64;

same here

> +                        if digest == &zero_chunk_digest {
> +                            let res = write_zero_callback(offset, index.chunk_size as u64);

same here with the chunk size

also here the problematic callback gets called...

> +                            if res < 0 {
> +                                bail!("write_zero_callback failed ({})", res);
> +                            }
> +                            bytes += index.chunk_size;
> +                            zeroes += index.chunk_size

same here

;
> +                        } else {
> +                            let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;

this chunk reader will also be moved, but this can be cloned beforehand

> +                            let res = write_data_callback(offset, &raw_data);

also a problematic callback...

> +                            if res < 0 {
> +                                bail!("write_data_callback failed ({})", res);
> +                            }
> +                            bytes += raw_data.len();
> +                        }
> +                        if verbose {
> +                            let next_per = ((pos+1)*100)/index.index_count();

same move problematic

> +                            if per != next_per {
> +                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
> +                                          next_per, bytes,
> +                                          zeroes*100/bytes, zeroes,
> +                                          start_time.elapsed().as_secs());
> +                                per = next_per;
> +                            }
> +                        }

here would belong the


Ok(())



> +                    }
> +                )
> +            );
>           }
> +        futures::future::join_all(my_handles).await;

at last, here i would use 'try_join_all' so that
it will stop when the first future gets aborted with an error
instead of waiting on all to be finished

>   
>           let end_time = std::time::Instant::now();
>           let elapsed = end_time.duration_since(start_time);
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 





^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
  2020-12-04 14:14 [pbs-devel] parallelize restore.rs fn restore_image: problems in async move Niko Fellner
  2020-12-04 14:59 ` Dominik Csapak
@ 2020-12-04 15:05 ` Dietmar Maurer
  1 sibling, 0 replies; 5+ messages in thread
From: Dietmar Maurer @ 2020-12-04 15:05 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Niko Fellner

as pointed out in the bug report, we already have such code in pull.rs

Why not use that instead of something new?

> On 12/04/2020 3:14 PM Niko Fellner <n.fellner@logics.de> wrote:
> 
>  
> In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:




^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
  2020-12-04 14:59 ` Dominik Csapak
@ 2020-12-05  1:11   ` Niko Fellner
  0 siblings, 0 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-05  1:11 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

> first thing: the error you get is simply a return type mismatch, the 'async move' block expects to return a Result<(), Error> because there is a bail!() and multiple '?' calls.
> the simplest way to fix this is to write
> Ok(())
> at the end of the async move block
> the next problem you will run into is that there are multiple uses of variables that get moved in a loop (which does not work in rust) so you have to pull out some things, and clone some others

@Dominik: Thanks a lot for your help! Yes, and after following your instructions I've discovered more and more similarities with "pull.rs"... 

> a much bigger problem is the following:
> the callbacks that get called (write_data_callback/write_zero_callback)
> are not 'Send' meaning they cannot send between threads safely (needed by tokio::spawn) because they both are closures that use a '*mut T'
> in that code this would not actually be a problem, since the restored blocks never overlap, but rust does not know that
> without digging further, i think this is not easily solvable in rust without either writing unsafe code or changing our c <-> rust api (idk if that is possible)

So that's not only a restriction of tokio::spawn, but also anything that Rayon provides (including its ParallelBridge?), and the code in pull.rs, too? (ParallelHandler, futures::stream, ...) ?

> as pointed out in the bug report, we already have such code in pull.rs
> Why not use that instead of something new?

@Dietmar: You're right, that looks like a gold mine - but unfortunately one that I cannot mine due to my limited knowledge of Rust...  Maybe you can help out to apply its magic on restore.rs? Most importantly, will it work with that write_data_callback and write_zero_callback which are not Send?

> this is not easily solvable in rust without either writing unsafe code or changing our c <-> rust api (idk if that is possible)

"Unsafe" seems to be the right keyword here. In proxmox-backup-qemu the CallbackPointers (capi_types.rs) do this: "unsafe impl std::marker::Send". Those CallbackPointers are used by lots of "_async" functions within the c <-> rust api. So I guess a hypothetic proxmox_restore_image_async needs to use CallbackPointers, too?



^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
@ 2020-12-05 23:39 Niko Fellner
  0 siblings, 0 replies; 5+ messages in thread
From: Niko Fellner @ 2020-12-05 23:39 UTC (permalink / raw)
  To: pbs-devel

Update: I was able to implement an unsafe SendRawPointer, but still have problems with the parallelization. 

I have two different versions:

Version ASYNC_CRASH:
- push all tasks into Vector tokio_handles and call try_join_all(tokio_handles).await later on
- It runs in parallel, but quickly leads to an out of memory error, because my implementation does all tasks once, instead of (number of CPU cores) tasks...
- Syslog even showed a segfault:
> Dec  5 22:50:49 pve kernel: [13437.190348] proxmox-restore[26081]: segfault at 8 ip 000055643d435b37 sp 00007f94d0f78c20 error 4 in pbs-restore[55643d34c000+104000]
> Dec  5 22:50:49 pve kernel: [13437.190357] Code: 48 85 ff 75 b2 48 8b 4c 24 28 64 48 33 0c 25 28 00 00 00 44 89 e8 75 43 48 83 c4 38 5b 5d 41 5c 41 5d c3 48 8b 85 b0 00 00 00 <48> 8b 50 08 48 89 95 b0 00 00 00 48 85 d2 74 11 48 c7 40 08 00 00
- Is there a way to call just a group of maybe 5 or 10 tokio handles? Or what am I doing wrong here in Version ASYNC_CRASH?

Version ASYNC_SINGLE: 
- use the async functions, but directly await after spawning the tokio task. 
- Restore works, it looks good
- But no parallelization
- Will do some performance tests tomorrow of ASYNC_SINGLE vs. the original version.

Original version:
commit 447552da4af1c7f0553873e4fd21335dab8fe029 (HEAD -> master, origin/master, origin/HEAD)
Author: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Date:   Mon Nov 30 13:41:45 2020 +0100


Maybe ASYNC_CRASH crashes because of how I use the chunk reader?
   - I couldn't use "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?", because that one never finished reading in my async block...?!
   - So I tried with "AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?" which works fine in Version ASYNC_SINGLE, but within ASYNC_CRASH it leads to chaos.


@Dietmar: unfortunately I still couldn't use much of pull.rs - I still don't understand it well enough.


VERSION ASYNC_CRASH:

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..c0f0bf8 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -151,38 +151,66 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        
+        let mut tokio_handles = vec![];
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push tasks");
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            tokio_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("pos: {}, offset: {}", pos, offset);
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index_chunk_size;
+                            zeroes += index_chunk_size;
+                        } else {
+                            //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                            //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                            let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                            //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index_count;
+                            //if per != next_per {
+                            if per < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, bytes,
+                                      zeroes*100/bytes, zeroes,
+                                      start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                        Ok(())
+                    }
+                )
+            );
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await");
+        if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+            eprintln!("Error during await: {}", e);
         }
+        eprintln!("END: await");
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);





VERSION ASYNC_SINGLE:
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..9d4fb4d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,39 +148,45 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
         let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            if let Err(e) = tokio::spawn(
+                async move {
+                    let digest = &index_digest;
+                    let offset = (pos*index_chunk_size) as u64;
+                    //eprintln!("pos: {}, offset: {}", pos, offset);
+                    if digest == &zero_chunk_digest {
+                        let res = write_zero_callback(offset, index_chunk_size as u64);
+                        //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                        bytes += index_chunk_size;
+                    } else {
+                        //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                        //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                        let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                        //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                        let res = write_data_callback(offset, &raw_data);
+                        //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        bytes += raw_data.len();
+                    }
+                    Ok(())
                 }
+            ).await {
+                eprintln!("Error during await: {}", e);
             }
         }




^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-12-05 23:40 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-12-04 14:14 [pbs-devel] parallelize restore.rs fn restore_image: problems in async move Niko Fellner
2020-12-04 14:59 ` Dominik Csapak
2020-12-05  1:11   ` Niko Fellner
2020-12-04 15:05 ` Dietmar Maurer
2020-12-05 23:39 Niko Fellner

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal