all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Niko Fellner <n.fellner@logics.de>
To: "pbs-devel@lists.proxmox.com" <pbs-devel@lists.proxmox.com>
Subject: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
Date: Fri, 4 Dec 2020 14:14:33 +0000	[thread overview]
Message-ID: <AM0PR09MB2754279488FDC8B0B4D7025FF2F10@AM0PR09MB2754.eurprd09.prod.outlook.com> (raw)

In order to parallelize restore_image within restore.rs (#3163), I tried to make use of tokio:

let mut my_handles = vec![];
for pos in 0..100 {
    my_handles.push(
        tokio::spawn(
            async move {
                println!("Task: {}", pos)
            }
        )
    );
}
futures::future::join_all(my_handles).await;

This simple code works and prints all tasks (in some random order), but when I change the body of the "async move" to the original loop body in restore_image, I get some build errors:

cargo build --release
   Compiling proxmox-backup-qemu v1.0.2 (/root/proxmox-backup-qemu)
error[E0308]: mismatched types
   --> src/restore.rs:181:48
    |
181 |   ...                   if per != next_per {
    |  __________________________________________^
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:181:29
    |
181 | / ...                   if per != next_per {
182 | | ...                       eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
183 | | ...                                 next_per, bytes,
184 | | ...                                 zeroes*100/bytes, zeroes,
185 | | ...                                 start_time.elapsed().as_secs());
186 | | ...                       per = next_per;
187 | | ...                   }
    | |_______________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error[E0308]: mismatched types
   --> src/restore.rs:179:25
    |
179 | /                         if verbose {
180 | |                             let next_per = ((pos+1)*100)/index.index_count();
181 | |                             if per != next_per {
182 | |                                 eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
...   |
187 | |                             }
188 | |                         }
    | |_________________________^ expected enum `std::result::Result`, found `()`
    |
    = note:   expected enum `std::result::Result<_, anyhow::Error>`
            found unit type `()`

error: aborting due to 3 previous errors

For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-backup-qemu`.

To learn more, run the command again with --verbose.
make: *** [Makefile:22: all] Fehler 101



Do you have any clue how to fix this?
I guess I am doing rookie mistakes.

Another question: Do you think it's easier to use rayon here instead of tokio to find out whether parallelization is worth it here or not? 
The rayon::iter::ParallelIterator looks promising, but I realized the rayon 1.5 lib is not included in http://download.proxmox.com/debian/devel/ currently...


diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..4d1df9c 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -151,38 +151,46 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        let mut my_handles = vec![];
 
         let start_time = std::time::Instant::now();
 
         for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+            my_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = index.index_digest(pos).unwrap();
+                        let offset = (pos*index.chunk_size) as u64;
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index.chunk_size as u64);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index.chunk_size;
+                            zeroes += index.chunk_size;
+                        } else {
+                            let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
+                            let res = write_data_callback(offset, &raw_data);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index.index_count();
+                            if per != next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                          next_per, bytes,
+                                          zeroes*100/bytes, zeroes,
+                                          start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                    }
+                )
+            );
         }
+        futures::future::join_all(my_handles).await;
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);



             reply	other threads:[~2020-12-04 14:15 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-04 14:14 Niko Fellner [this message]
2020-12-04 14:59 ` Dominik Csapak
2020-12-05  1:11   ` Niko Fellner
2020-12-04 15:05 ` Dietmar Maurer
2020-12-05 23:39 Niko Fellner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=AM0PR09MB2754279488FDC8B0B4D7025FF2F10@AM0PR09MB2754.eurprd09.prod.outlook.com \
    --to=n.fellner@logics.de \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal