From: Niko Fellner <n.fellner@logics.de>
To: "pbs-devel@lists.proxmox.com" <pbs-devel@lists.proxmox.com>
Subject: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
Date: Fri, 4 Dec 2020 14:14:33 +0000 [thread overview]
Message-ID: <AM0PR09MB2754279488FDC8B0B4D7025FF2F10@AM0PR09MB2754.eurprd09.prod.outlook.com> (raw)
In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:
let mut my_handles = vec![];
for pos in 0..100 {
my_handles.push(
tokio::spawn(
async move {
println!("Task: {}", pos)
}
)
);
}
futures::future::join_all(my_handles).await;
This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:
cargo build --release
Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
error[E0308]: mismatched types
--> src/restore.rs:181:48
|
181 | ... if per != next_per {
| __________________________________________^
182 | | ... eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ... next_per, bytes,
184 | | ... zeroes*100/bytes, zeroes,
185 | | ... start_time.elapsed().as_secs());
186 | | ... per = next_per;
187 | | ... }
| |_______________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error[E0308]: mismatched types
--> src/restore.rs:181:29
|
181 | / ... if per != next_per {
182 | | ... eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ... next_per, bytes,
184 | | ... zeroes*100/bytes, zeroes,
185 | | ... start_time.elapsed().as_secs());
186 | | ... per = next_per;
187 | | ... }
| |_______________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error[E0308]: mismatched types
--> src/restore.rs:179:25
|
179 | / if verbose {
180 | | let next_per = ((pos+1)*100)/index.index_count();
181 | | if per != next_per {
182 | | eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
... |
187 | | }
188 | | }
| |_________________________^ expected enum `std::result::Result`, found `()`
|
= note: expected enum `std::result::Result<_, anyhow::Error>`
found unit type `()`
error: aborting due to 3 previous errors
For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-backup-qemu`.
To learn more, run the command again with --verbose.
make: *** [Makefile:22: all] Fehler 101
Do you have any clue how to fix this?
I guess I am doing rookie mistakes.
Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not?
The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...
diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
libc = "0.2"
once_cell = "1.3.1"
openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
bincode = "1.0"
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..4d1df9c 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -151,38 +151,46 @@ impl RestoreTask {
let mut per = 0;
let mut bytes = 0;
let mut zeroes = 0;
+ let mut my_handles = vec![];
let start_time = std::time::Instant::now();
for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
- }
- bytes += raw_data.len();
- }
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
- }
- }
+ my_handles.push(
+ tokio::spawn(
+ async move {
+ let digest = index.index_digest(pos).unwrap();
+ let offset = (pos*index.chunk_size) as u64;
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index.chunk_size as u64);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes += index.chunk_size;
+ zeroes += index.chunk_size;
+ } else {
+ let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+ let res = write_data_callback(offset, &raw_data);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes += raw_data.len();
+ }
+ if verbose {
+ let next_per = ((pos+1)*100)/index.index_count();
+ if per != next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ next_per, bytes,
+ zeroes*100/bytes, zeroes,
+ start_time.elapsed().as_secs());
+ per = next_per;
+ }
+ }
+ }
+ )
+ );
}
+ futures::future::join_all(my_handles).await;
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
next reply other threads:[~2020-12-04 14:15 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-12-04 14:14 Niko Fellner [this message]
2020-12-04 14:59 ` Dominik Csapak
2020-12-05 1:11 ` Niko Fellner
2020-12-04 15:05 ` Dietmar Maurer
2020-12-05 23:39 Niko Fellner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=AM0PR09MB2754279488FDC8B0B4D7025FF2F10@AM0PR09MB2754.eurprd09.prod.outlook.com \
--to=n.fellner@logics.de \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox