all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Niko Fellner <n.fellner@logics.de>
To: "pbs-devel@lists.proxmox.com" <pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move
Date: Sat, 5 Dec 2020 23:39:51 +0000	[thread overview]
Message-ID: <AM0PR09MB27544DAF8C82BC68C5AEB5FCF2F00@AM0PR09MB2754.eurprd09.prod.outlook.com> (raw)

Update: I was able to implement an unsafe SendRawPointer, but still have problems with the parallelization. 

I have two different versions:

Version ASYNC_CRASH:
- push all tasks into Vector tokio_handles and call try_join_all(tokio_handles).await later on
- It runs in parallel, but quickly leads to an out of memory error, because my implementation does all tasks once, instead of (number of CPU cores) tasks...
- Syslog even showed a segfault:
> Dec  5 22:50:49 pve kernel: [13437.190348] proxmox-restore[26081]: segfault at 8 ip 000055643d435b37 sp 00007f94d0f78c20 error 4 in pbs-restore[55643d34c000+104000]
> Dec  5 22:50:49 pve kernel: [13437.190357] Code: 48 85 ff 75 b2 48 8b 4c 24 28 64 48 33 0c 25 28 00 00 00 44 89 e8 75 43 48 83 c4 38 5b 5d 41 5c 41 5d c3 48 8b 85 b0 00 00 00 <48> 8b 50 08 48 89 95 b0 00 00 00 48 85 d2 74 11 48 c7 40 08 00 00
- Is there a way to call just a group of maybe 5 or 10 tokio handles? Or what am I doing wrong here in Version ASYNC_CRASH?

Version ASYNC_SINGLE: 
- use the async functions, but directly await after spawning the tokio task. 
- Restore works, it looks good
- But no parallelization
- Will do some performance tests tomorrow of ASYNC_SINGLE vs. the original version.

Original version:
commit 447552da4af1c7f0553873e4fd21335dab8fe029 (HEAD -> master, origin/master, origin/HEAD)
Author: Fabian Grünbichler <f.gruenbichler@proxmox.com>
Date:   Mon Nov 30 13:41:45 2020 +0100


Maybe ASYNC_CRASH crashes because of how I use the chunk reader?
   - I couldn't use "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?", because that one never finished reading in my async block...?!
   - So I tried with "AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?" which works fine in Version ASYNC_SINGLE, but within ASYNC_CRASH it leads to chaos.


@Dietmar: unfortunately I still couldn't use much of pull.rs - I still don't understand it well enough.


VERSION ASYNC_CRASH:

diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
 libc = "0.2"
 once_cell = "1.3.1"
 openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
 serde_json = "1.0"
 tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
 bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
 use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
 use std::ffi::CString;
 
 pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
 /// Opaque handle for backups jobs
 #[repr(C)]
 pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+    pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+    pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+    pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+        return (self.callback)(self.callback_data, offset, data, len);
+    }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
 
         let archive_name = tools::utf8_c_string(archive_name)?
             .ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+            
+        let send_raw_pointer = SendRawPointer { callback, callback_data };
 
         let write_data_callback = move |offset: u64, data: &[u8]| {
-            callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+            return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
         };
 
         let write_zero_callback = move |offset: u64, len: u64| {
-            callback(callback_data, offset, std::ptr::null(), len)
+            return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
         };
 
         proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..c0f0bf8 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -151,38 +151,66 @@ impl RestoreTask {
         let mut per = 0;
         let mut bytes = 0;
         let mut zeroes = 0;
+        
+        let mut tokio_handles = vec![];
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+        eprintln!("BEGIN: push tasks");
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
-                }
-            }
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            tokio_handles.push(
+                tokio::spawn(
+                    async move {
+                        let digest = &index_digest;
+                        let offset = (pos*index_chunk_size) as u64;
+                        //eprintln!("pos: {}, offset: {}", pos, offset);
+                        if digest == &zero_chunk_digest {
+                            let res = write_zero_callback(offset, index_chunk_size as u64);
+                            //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_zero_callback failed ({})", res);
+                            }
+                            bytes += index_chunk_size;
+                            zeroes += index_chunk_size;
+                        } else {
+                            //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                            //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                            let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                            //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                            let res = write_data_callback(offset, &raw_data);
+                            //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                            if res < 0 {
+                                bail!("write_data_callback failed ({})", res);
+                            }
+                            bytes += raw_data.len();
+                        }
+                        if verbose {
+                            let next_per = ((pos+1)*100)/index_count;
+                            //if per != next_per {
+                            if per < next_per {
+                                eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+                                      next_per, bytes,
+                                      zeroes*100/bytes, zeroes,
+                                      start_time.elapsed().as_secs());
+                                per = next_per;
+                            }
+                        }
+                        Ok(())
+                    }
+                )
+            );
+        }
+        eprintln!("END: push tasks");
+        eprintln!("BEGIN: await");
+        if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+            eprintln!("Error during await: {}", e);
         }
+        eprintln!("END: await");
 
         let end_time = std::time::Instant::now();
         let elapsed = end_time.duration_since(start_time);





VERSION ASYNC_SINGLE:
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..9d4fb4d 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -106,12 +106,12 @@ impl RestoreTask {
     pub fn runtime(&self) -> tokio::runtime::Handle {
         self.runtime.handle().clone()
     }
-
-    pub async fn restore_image(
+    
+    pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
         &self,
         archive_name: String,
-        write_data_callback: impl Fn(u64, &[u8]) -> i32,
-        write_zero_callback: impl Fn(u64, u64) -> i32,
+        write_data_callback: A,
+        write_zero_callback: B,
         verbose: bool,
     ) -> Result<(), Error> {
 
@@ -148,39 +148,45 @@ impl RestoreTask {
             most_used,
         );
 
-        let mut per = 0;
         let mut bytes = 0;
-        let mut zeroes = 0;
+        
+        let index_chunk_size = index.chunk_size;
+        let index_count = index.index_count();
+        eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
 
         let start_time = std::time::Instant::now();
 
-        for pos in 0..index.index_count() {
-            let digest = index.index_digest(pos).unwrap();
-            let offset = (pos*index.chunk_size) as u64;
-            if digest == &zero_chunk_digest {
-                let res = write_zero_callback(offset, index.chunk_size as u64);
-                if res < 0 {
-                    bail!("write_zero_callback failed ({})", res);
-                }
-                bytes += index.chunk_size;
-                zeroes += index.chunk_size;
-            } else {
-                let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
-                let res = write_data_callback(offset, &raw_data);
-                if res < 0 {
-                    bail!("write_data_callback failed ({})", res);
-                }
-                bytes += raw_data.len();
-            }
-            if verbose {
-                let next_per = ((pos+1)*100)/index.index_count();
-                if per != next_per {
-                    eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
-                              next_per, bytes,
-                              zeroes*100/bytes, zeroes,
-                              start_time.elapsed().as_secs());
-                    per = next_per;
+        for pos in 0..index_count {
+            let chunk_reader_clone = chunk_reader.clone();
+            let index_digest = index.index_digest(pos).unwrap().clone();
+            if let Err(e) = tokio::spawn(
+                async move {
+                    let digest = &index_digest;
+                    let offset = (pos*index_chunk_size) as u64;
+                    //eprintln!("pos: {}, offset: {}", pos, offset);
+                    if digest == &zero_chunk_digest {
+                        let res = write_zero_callback(offset, index_chunk_size as u64);
+                        //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_zero_callback failed ({})", res);
+                        }
+                        bytes += index_chunk_size;
+                    } else {
+                        //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+                        //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+                        let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+                        //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+                        let res = write_data_callback(offset, &raw_data);
+                        //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+                        if res < 0 {
+                            bail!("write_data_callback failed ({})", res);
+                        }
+                        bytes += raw_data.len();
+                    }
+                    Ok(())
                 }
+            ).await {
+                eprintln!("Error during await: {}", e);
             }
         }




             reply	other threads:[~2020-12-05 23:40 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-12-05 23:39 Niko Fellner [this message]
  -- strict thread matches above, loose matches on Subject: below --
2020-12-04 14:14 Niko Fellner
2020-12-04 14:59 ` Dominik Csapak
2020-12-05  1:11   ` Niko Fellner
2020-12-04 15:05 ` Dietmar Maurer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=AM0PR09MB27544DAF8C82BC68C5AEB5FCF2F00@AM0PR09MB2754.eurprd09.prod.outlook.com \
    --to=n.fellner@logics.de \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal