From: Niko Fellner <n.fellner@logics.de>
To: "pbs-devel@lists.proxmox.com" <pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in
Date: Sun, 6 Dec 2020 02:51:55 +0000 [thread overview]
Message-ID: <AM0PR09MB275437F55CA23F91425E1DC8F2CF0@AM0PR09MB2754.eurprd09.prod.outlook.com> (raw)
Another update:
- Now working with the await.unwrap of future pairs of 2
- Using atomics to make counting of percentage, bytes, zeroes work
- Sometimes the program runs and finishes OK.. first impression: performance looks good (about 230 vs 380 seconds (sync) for 32 GiB VM, but can't really measure performance now, server is busy)
- But sometimes the program segfaults... Not sure why? Maybe anyone has an idea?
- The more futs I await.unwrap in parallel (see "if futs.len() >= N ..."), the faster/more probable the segfaults occur.
diff --git a/Cargo.toml b/Cargo.toml
index 7f29d0a..c87bf5a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,9 +27,9 @@ lazy_static = "1.4"
libc = "0.2"
once_cell = "1.3.1"
openssl = "0.10"
-proxmox = { version = "0.7.0", features = [ "sortable-macro", "api-macro" ] }
-proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
-#proxmox-backup = { path = "../proxmox-backup" }
+proxmox = { version = "0.8.0", features = [ "sortable-macro", "api-macro" ] }
+#proxmox-backup = { git = "git://git.proxmox.com/git/proxmox-backup.git", tag = "v1.0.4" }
+proxmox-backup = { path = "../proxmox-backup" }
serde_json = "1.0"
tokio = { version = "0.2.9", features = [ "blocking", "fs", "io-util", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] }
bincode = "1.0"
diff --git a/src/capi_types.rs b/src/capi_types.rs
index 1b9abc1..de08523 100644
--- a/src/capi_types.rs
+++ b/src/capi_types.rs
@@ -1,5 +1,5 @@
use anyhow::Error;
-use std::os::raw::{c_char, c_void, c_int};
+use std::os::raw::{c_uchar, c_char, c_void, c_int};
use std::ffi::CString;
pub(crate) struct CallbackPointers {
@@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle;
/// Opaque handle for backups jobs
#[repr(C)]
pub struct ProxmoxBackupHandle;
+
+#[derive(Copy, Clone)]
+pub(crate) struct SendRawPointer {
+ pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> c_int,
+ pub callback_data: *mut c_void
+}
+unsafe impl std::marker::Send for SendRawPointer {}
+impl SendRawPointer {
+ pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) -> i32 {
+ return (self.callback)(self.callback_data, offset, data, len);
+ }
+}
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index b755014..39baddb 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image(
let archive_name = tools::utf8_c_string(archive_name)?
.ok_or_else(|| format_err!("archive_name must not be NULL"))?;
+
+ let send_raw_pointer = SendRawPointer { callback, callback_data };
let write_data_callback = move |offset: u64, data: &[u8]| {
- callback(callback_data, offset, data.as_ptr(), data.len() as u64)
+ return send_raw_pointer.call_itself(offset, data.as_ptr(), data.len() as u64)
};
let write_zero_callback = move |offset: u64, len: u64| {
- callback(callback_data, offset, std::ptr::null(), len)
+ return send_raw_pointer.call_itself(offset, std::ptr::null(), len)
};
proxmox_backup::tools::runtime::block_on(
diff --git a/src/restore.rs b/src/restore.rs
index 24983dd..f7aa564 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -66,8 +66,10 @@ impl RestoreTask {
let mut builder = tokio::runtime::Builder::new();
builder.threaded_scheduler();
builder.enable_all();
- builder.max_threads(6);
- builder.core_threads(4);
+ //builder.max_threads(6);
+ //builder.core_threads(4);
+ builder.max_threads(12);
+ builder.core_threads(6);
builder.thread_name("proxmox-restore-worker");
builder
});
@@ -106,12 +108,12 @@ impl RestoreTask {
pub fn runtime(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
-
- pub async fn restore_image(
+
+ pub async fn restore_image<A: 'static + Copy + Send + Fn(u64, &[u8]) -> i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> (
&self,
archive_name: String,
- write_data_callback: impl Fn(u64, &[u8]) -> i32,
- write_zero_callback: impl Fn(u64, u64) -> i32,
+ write_data_callback: A,
+ write_zero_callback: B,
verbose: bool,
) -> Result<(), Error> {
@@ -148,46 +150,108 @@ impl RestoreTask {
most_used,
);
- let mut per = 0;
- let mut bytes = 0;
- let mut zeroes = 0;
+ let per = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let bytes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let zeroes = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
+
+ //let mut tokio_handles = vec![];
+ //let mut futs = futures::stream::FuturesUnordered::new();
+ //use futures::stream::{self, StreamExt, TryStreamExt};
+ use futures::stream::{StreamExt};
+ //let futs = tokio::stream::iter;
+ let mut futs = futures::stream::FuturesUnordered::new();
+
+ let index_chunk_size = index.chunk_size;
+ let index_count = index.index_count();
+ eprintln!("index_count = {}, index_chunk_size: {}", index_count, index_chunk_size);
+ eprintln!("BEGIN: push and await tasks");
let start_time = std::time::Instant::now();
- for pos in 0..index.index_count() {
- let digest = index.index_digest(pos).unwrap();
- let offset = (pos*index.chunk_size) as u64;
- if digest == &zero_chunk_digest {
- let res = write_zero_callback(offset, index.chunk_size as u64);
- if res < 0 {
- bail!("write_zero_callback failed ({})", res);
- }
- bytes += index.chunk_size;
- zeroes += index.chunk_size;
- } else {
- let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?;
- let res = write_data_callback(offset, &raw_data);
- if res < 0 {
- bail!("write_data_callback failed ({})", res);
+ for pos in 0..index_count {
+ let chunk_reader_clone = chunk_reader.clone();
+ let index_digest = index.index_digest(pos).unwrap().clone();
+ let per = std::sync::Arc::clone(&per);
+ let bytes = std::sync::Arc::clone(&bytes);
+ let zeroes = std::sync::Arc::clone(&zeroes);
+ futs.push(
+ tokio::spawn(
+ async move {
+ let digest = &index_digest;
+ let offset = (pos*index_chunk_size) as u64;
+ //eprintln!("pos: {}, offset: {}", pos, offset);
+ if digest == &zero_chunk_digest {
+ let res = write_zero_callback(offset, index_chunk_size as u64);
+ //eprintln!("write_zero_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_zero_callback failed ({})", res);
+ }
+ bytes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+ zeroes.fetch_add(index_chunk_size, std::sync::atomic::Ordering::SeqCst);
+ } else {
+ //eprintln!("BEFORE read_chunk: pos: {}, offset: {}", pos, offset);
+ //let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; // never finishes reading...
+ let raw_data = AsyncReadChunk::read_chunk(&chunk_reader_clone, &digest).await?;
+ //eprintln!("AFTER read_chunk: pos: {}, offset: {}", pos, offset);
+ let res = write_data_callback(offset, &raw_data);
+ //eprintln!("write_data_callback with res: {}, pos: {}, offset: {}", res, pos, offset);
+ if res < 0 {
+ bail!("write_data_callback failed ({})", res);
+ }
+ bytes.fetch_add(raw_data.len(), std::sync::atomic::Ordering::SeqCst);
+ }
+ if verbose {
+ let next_per = ((pos+1)*100)/index_count;
+ let currPer = per.load(std::sync::atomic::Ordering::SeqCst);
+ let currBytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+ let currZeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
+ //if per != next_per {
+ if currPer < next_per {
+ eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
+ next_per, currBytes,
+ currZeroes*100/currBytes, currZeroes,
+ start_time.elapsed().as_secs());
+ per.store(next_per, std::sync::atomic::Ordering::SeqCst);
+ }
+ }
+ Ok(())
+ }
+ )
+ );
+
+ //if futs.len() >= 2 {
+ if futs.len() >= 2 {
+ let response = futs.next().await.unwrap();
+ if let Err(e) = response {
+ eprintln!("Error during await: {}", e);
}
- bytes += raw_data.len();
}
- if verbose {
- let next_per = ((pos+1)*100)/index.index_count();
- if per != next_per {
- eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)",
- next_per, bytes,
- zeroes*100/bytes, zeroes,
- start_time.elapsed().as_secs());
- per = next_per;
- }
+ }
+ eprintln!("END: push tasks");
+ eprintln!("BEGIN: await remaining");
+ // Wait for the remaining to finish.
+ while let Some(response) = futs.next().await {
+ if let Err(e) = response {
+ eprintln!("Error during await: {}", e);
}
}
+ eprintln!("END: await remaining");
+
+ //futs.try_buffer_unordered(20)
+ //.try_for_each(|_res| futures::future::ok(()))
+ //.await?;
+ //if let Err(e) = futures::future::try_join_all(tokio_handles).await {
+ // eprintln!("Error during await: {}", e);
+ //}
+ let bytes = bytes.load(std::sync::atomic::Ordering::SeqCst);
+ let zeroes = zeroes.load(std::sync::atomic::Ordering::SeqCst);
let end_time = std::time::Instant::now();
let elapsed = end_time.duration_since(start_time);
- eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)",
+ eprintln!("restore image complete (bytes={}, zeroes={}, duration={:.2}s, speed={:.2}MB/s)",
bytes,
+ zeroes,
elapsed.as_secs_f64(),
bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64())
);
next reply other threads:[~2020-12-06 2:52 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2020-12-06 2:51 Niko Fellner [this message]
2020-12-07 2:51 Niko Fellner
2020-12-07 13:24 ` Dominik Csapak
2020-12-07 22:59 Niko Fellner
2020-12-09 2:07 Niko Fellner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=AM0PR09MB275437F55CA23F91425E1DC8F2CF0@AM0PR09MB2754.eurprd09.prod.outlook.com \
--to=n.fellner@logics.de \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.