From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with UTF8SMTPS id 8E50C69D10 for ; Mon, 7 Dec 2020 14:24:10 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with UTF8SMTP id 81FD225399 for ; Mon, 7 Dec 2020 14:24:10 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with UTF8SMTPS id 486E62538B for ; Mon, 7 Dec 2020 14:24:09 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with UTF8SMTP id 132A444E95 for ; Mon, 7 Dec 2020 14:24:09 +0100 (CET) To: pbs-devel@lists.proxmox.com References: From: Dominik Csapak Message-ID: <6c3c2404-d5a6-c274-48b2-5691e7500348@proxmox.com> Date: Mon, 7 Dec 2020 14:24:07 +0100 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:84.0) Gecko/20100101 Thunderbird/84.0 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=UTF-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.295 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment NICE_REPLY_A -0.001 Looks like a legit reply (A) RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 07 Dec 2020 13:24:10 -0000 hi, i looked a bit more at this today first, it seems that you are not that familiar with rust and especially async rust (tokio, etc), please correct me if i am wrong. it is a complicated topic, so i would suggest that you make yourself familiar with the concepts, rusts 'reference documentation' can be found under https://doc.rust-lang.org/book/ https://rust-lang.github.io/async-book/ and there are sure many other great resources out there. ofc you can always ask here too, but it may take longer to explain. On 12/7/20 3:51 AM, Niko Fellner wrote: > By using mutex (AtomicBool - compare_exchange) I found out that only the write_zero_callback and write_data_callback are problematic (no mutex -> segfaults). Maybe anyone can find out why? > here a atomicbool/thread::yield are all not necessary (i think it is even a hindrance, because you now pause tokio threads), all you need is rusts std::sync::Mutex the problem afaics, is that qemu cannot have multiple writers on the same block backend from multiple threads, and it seems that is a general qemu limitation (you can even only have one iothread per image) so i am not sure that it is possible to implement what you want with a qemu layer ofc you can ask the qemu people on their mailing list, maybe i am simply not seeing how so we have to synchronize on the writes, which makes the whole patch less interesting since it will not solve your bottleneck i assume? the only thing we can parallelize is the download of the chunks, but in my tests here, that did not improve restore speed at all i attach my code (which is based on yours) that should be ok i think, but please do not use that as i only tested very shortly... (it is more intended to be a reference on how one could do such a thing) i commented the mutex out for now, like this it will crash with segfaults (like your code does) with the mutex compiled in, it works but is as slow as without the patch (maybe playing around with the threadcount could still make a difference) i hope this helps :) ------------ diff --git a/src/lib.rs b/src/lib.rs index b755014..20650b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -810,6 +810,14 @@ pub extern "C" fn proxmox_restore_image( verbose: bool, ) -> c_int { + #[derive(Clone,Copy)] + struct SafePointer { + pointer: *mut c_void, + } + + unsafe impl Send for SafePointer {}; + unsafe impl Sync for SafePointer {}; + let restore_task = restore_handle_to_task(handle); let result: Result<_, Error> = try_block!({ @@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image( let archive_name = tools::utf8_c_string(archive_name)? .ok_or_else(|| format_err!("archive_name must not be NULL"))?; + let foo = SafePointer { pointer: callback_data.clone() }; let write_data_callback = move |offset: u64, data: &[u8]| { - callback(callback_data, offset, data.as_ptr(), data.len() as u64) + callback(foo.pointer, offset, data.as_ptr(), data.len() as u64) }; let write_zero_callback = move |offset: u64, len: u64| { - callback(callback_data, offset, std::ptr::null(), len) + callback(foo.pointer, offset, std::ptr::null(), len) }; proxmox_backup::tools::runtime::block_on( diff --git a/src/restore.rs b/src/restore.rs index 24983dd..caa5aeb 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -8,6 +8,7 @@ use tokio::runtime::Runtime; use tokio::prelude::*; use proxmox_backup::tools::runtime::get_runtime_with_builder; +use proxmox_backup::tools::ParallelHandler; use proxmox_backup::backup::*; use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader}; @@ -66,8 +67,8 @@ impl RestoreTask { let mut builder = tokio::runtime::Builder::new(); builder.threaded_scheduler(); builder.enable_all(); - builder.max_threads(6); - builder.core_threads(4); + builder.max_threads(8); + builder.core_threads(6); builder.thread_name("proxmox-restore-worker"); builder }); @@ -110,8 +111,8 @@ impl RestoreTask { pub async fn restore_image( &self, archive_name: String, - write_data_callback: impl Fn(u64, &[u8]) -> i32, - write_zero_callback: impl Fn(u64, u64) -> i32, + write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 'static, + write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 'static, verbose: bool, ) -> Result<(), Error> { @@ -148,51 +149,92 @@ impl RestoreTask { most_used, ); - let mut per = 0; - let mut bytes = 0; - let mut zeroes = 0; - let start_time = std::time::Instant::now(); - for pos in 0..index.index_count() { - let digest = index.index_digest(pos).unwrap(); - let offset = (pos*index.chunk_size) as u64; - if digest == &zero_chunk_digest { - let res = write_zero_callback(offset, index.chunk_size as u64); - if res < 0 { - bail!("write_zero_callback failed ({})", res); - } - bytes += index.chunk_size; - zeroes += index.chunk_size; - } else { - let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; - let res = write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes += raw_data.len(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); + +// let mutex = Arc::new(Mutex::new(())); + + let index_count = index.index_count(); + let pool = ParallelHandler::new( + "restore", 4, + move |(digest, offset, size): ([u8;32], u64, u64)| { +// let mutex = mutex.clone(); + let chunk_reader = chunk_reader.clone(); + let (bytes, zeroes) = if digest == zero_chunk_digest { + { +// let _guard = mutex.lock().unwrap(); + let res = write_zero_callback(offset, size); + if res < 0 { + bail!("write_zero_callback failed ({})", res); + } + } + (size, size) + } else { + let size = { +// let _guard = mutex.lock().unwrap(); + let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; + let res = write_data_callback(offset, &raw_data); + if res < 0 { + bail!("write_data_callback failed ({})", res); + } + raw_data.len() as u64 + }; + (size, 0) + }; + + sender.send((bytes, zeroes))?; + + Ok(()) } - if verbose { - let next_per = ((pos+1)*100)/index.index_count(); - if per != next_per { - eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); - per = next_per; + ); + + let channel = pool.channel(); + + let output = tokio::spawn(async move { + let mut count = 0; + let mut per = 0; + let mut bytes = 0; + let mut zeroes = 0; + while let Some((new_bytes, new_zeroes)) = receiver.recv().await { + bytes += new_bytes; + zeroes += new_zeroes; + count += 1; + if verbose { + let next_per = ((count)*100)/index_count; + if per != next_per { + eprintln!("progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", + next_per, bytes, + zeroes*100/bytes, zeroes, + start_time.elapsed().as_secs()); + per = next_per; + } + } + if count >= index_count { + break; } } - } - let end_time = std::time::Instant::now(); - let elapsed = end_time.duration_since(start_time); - eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", - bytes, - elapsed.as_secs_f64(), - bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) - ); + let end_time = std::time::Instant::now(); + let elapsed = end_time.duration_since(start_time); + eprintln!("restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", + bytes, + elapsed.as_secs_f64(), + bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) + ); + + Ok::<_, Error>(()) + }); + + for pos in 0..index.index_count() { + let digest = index.index_digest(pos).unwrap().clone(); + let offset = (pos*index.chunk_size) as u64; + let chunk_size = index.chunk_size; + + proxmox_backup::tools::runtime::block_in_place(|| channel.send((digest, offset, chunk_size as u64)))?; + } - Ok(()) + output.await? } pub fn get_image_length(&self, aid: u8) -> Result {