From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1A9416929C for ; Sun, 6 Dec 2020 03:52:05 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0CABE172B6 for ; Sun, 6 Dec 2020 03:52:05 +0100 (CET) Received: from EUR05-DB8-obe.outbound.protection.outlook.com (mail-db8eur05on2073.outbound.protection.outlook.com [40.107.20.73]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id E43E0172AD for ; Sun, 6 Dec 2020 03:52:03 +0100 (CET) ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=apq6WjJffsZQITkBX9Z1PCgGDLNFGKHR0A/YLsLPh8+v0da4yp1h0vvveyP3rdqdTnX+cGKAobninZeNPyvzawdjRCDIIW1tturOzfdROy0tPN1W95kEeQSP8Oza7Pw8lAiNihDlgYaGSa6uzc3xBa+vp3j6W5xzJ9UtQ8qRgr8nm0HRJ/hf2oVWn87ixGhuQUEv5HKBH0FgzsuleXwrp30Ehmw/ftAPQBGziI3m2eDDMvR9uHu5q+atTUwcMhzkwkB6JlDUPO464OgXgTEfTHMlk/JfcyW3arMeTfj/BSPdu2ZICZbN94qKU2Ajy0170sVReABA+vlKJoIi8YKIVQ== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=ZCNODRw/T7uh+ofXdsxx3TXrV+osKliYFwLnigpzJo8=; b=f3I72V5a7UWH5d/wo9ziYJ3HcSSsgppisFm6WgcY5G7E6nh5F3RUgJ2T/YV/t53JbROYt+kH/D5QD75ra6YO9vVgpNGk63rjqwztsyucWkBQwCxrJfvYD+fxNJjcVVuS+I6aGwBcWzab7rDkGtE9WzccAhozDXwrja5/nDH4fB+VMKu8xB/YmzFkdT3bsZxVvohdzN6n0vTyuJhcquqWi9J+y92jXgQJvCw16nzIfvDybJxp+AlefiSOLwojXZzND+ABOWTQEq2pinPfpSD35v4DIMFHuUZTd955YWEq2m97KnLd4IlD0gippo9OJyY43dGn3wy6Ugvzi7rhFVe+kQ== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=logics.de; dmarc=pass action=none header.from=logics.de; dkim=pass header.d=logics.de; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=logics99.onmicrosoft.com; s=selector2-logics99-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=ZCNODRw/T7uh+ofXdsxx3TXrV+osKliYFwLnigpzJo8=; b=CVDy5GwCUuhA9AI/EitNQ7Vs6P9bK+M+GjO253dk9ArIQSffA8nDRxExwYiSxZBOV6SNsvqvFTuZAZlnkGBPuc+iCQMf+EW+tnZZjOJoCFduO1dCzyx/rDNvEGTlV5D8Iv2EJPnABvJ+inM3HBMoU9H6b4haKR8C1S0OYHm3zIE= Received: from AM0PR09MB2754.eurprd09.prod.outlook.com (2603:10a6:208:12e::18) by AM0PR09MB2884.eurprd09.prod.outlook.com (2603:10a6:208:129::26) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3632.17; Sun, 6 Dec 2020 02:51:55 +0000 Received: from AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24]) by AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24%3]) with mapi id 15.20.3632.022; Sun, 6 Dec 2020 02:51:55 +0000 From: Niko Fellner To: "pbs-devel@lists.proxmox.com" Thread-Topic: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in Thread-Index: AdbLeaOEcCnWjEi7TsOe3Z14G0Sxbg== Date: Sun, 6 Dec 2020 02:51:55 +0000 Message-ID: Accept-Language: de-DE, en-US Content-Language: de-DE X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: lists.proxmox.com; dkim=none (message not signed) header.d=none;lists.proxmox.com; dmarc=none action=none header.from=logics.de; x-originating-ip: [80.81.12.137] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: a935aa34-9966-442b-4d20-08d89991e45b x-ms-traffictypediagnostic: AM0PR09MB2884: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:4502; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: X6cG4+VgjX9w1Bqt77N63hzOnQjkfPY/zijV/CdDsALXodPWOkJCyrT8yRR40xyYgsqOLXvsT2xaYfZ83OoRac3GB2ozJD+ihASdz44Mj+XJIz9vLrvYMh3aVtwTWDmDJj31p+Y/57qk+CHvhvZ4ZNrQV9jfLKHqxcacn7Dqn3Olk+iAPkEX/agF/N68HFHcnJGGXcDFSeuDr1l5wzfOrig2vmeMTGyQT6y22y2TpfP3Lktb3sQMaqYv0XTcmVoyJZMzrhj6Bt9e71Aa1UE63uupfWwgGlTASosIT6HCnfdd2KKLzBR5ov7l3w34ow7s x-forefront-antispam-report: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:AM0PR09MB2754.eurprd09.prod.outlook.com; PTR:; CAT:NONE; SFS:(396003)(39830400003)(136003)(376002)(346002)(366004)(6506007)(2906002)(66556008)(64756008)(7696005)(5660300002)(66476007)(66946007)(83380400001)(478600001)(66446008)(76116006)(8676002)(8936002)(55016002)(86362001)(316002)(26005)(33656002)(71200400001)(6916009)(9686003)(52536014)(186003); DIR:OUT; SFP:1101; x-ms-exchange-antispam-messagedata: =?us-ascii?Q?biUr5aqTDHAQjmPJTzzyMnO8YmjNEPmpLuyt3bYjeT+vzf9WtJbZn8aoHX/t?= =?us-ascii?Q?pdhRL8KzfBHzlU3LOwjfnDuJNZEvefOGbM35X4zo9kZSQzMEDlosdWrYFErl?= =?us-ascii?Q?jdVWgNj21EtdG00K8l6ZOV1AakXTGKXttxxmj9rDthvvbgN/oXiO+P9FnMTI?= =?us-ascii?Q?psyVtoaPMujggBymcmBbYb6l9nS0eRjfR6TCfdrhstQs/w0o2AJ4DaZ3Id61?= =?us-ascii?Q?t28mxYpueM7rihUGttIP9N2hAvFe9W3eG4rBYl6kypqLnthq58InHW9iJXjK?= =?us-ascii?Q?+rOh3KN68ii6/nStlNg5lnQ6OOYslx+niO5Pm9oZoTEW6N8KT3UTYFhY8D26?= =?us-ascii?Q?xzOnG3wBZ3wP6sCGFK7b3l7Xk4ZwUDDnpinWpPlahG3peQcdVyoflJRbS7Bq?= =?us-ascii?Q?IW0+e+sBMAWAxHEWvEuWKFOUjiNTEzjikQHCaykVtmIAZ1sdFFWPg7ixrucg?= =?us-ascii?Q?ADKaTJ0S9c7k4aXV0WRoSLjPxTEp2B+cON5VSpeDwWCQFuZxbTFTn2EDJ11I?= =?us-ascii?Q?UkX6AS5MTs7QgjphiPwfgxTcBsnBJ0PpRR8YrVAVrS/p4pr5YuoPwgZaXXio?= =?us-ascii?Q?ZqYLwafTNYPtZraNGhdjgPoX0Se1sKwzRAKXsZ7Vy1AYVT0VEZpxk3uool5D?= =?us-ascii?Q?e0zNoJuOd+X06yb7fUQ6myp+JjKsk1RctSG60HfuQ7UORt/ZPj259fppT2fA?= =?us-ascii?Q?6s2AHUm1Ihk+WDfexpDRtN4nwumeudQgN6X8ml+pi80Gl7ArRwqg5pOYv10q?= =?us-ascii?Q?1QT7eHvmHDQP9HEJNZe/V7dFLK45xCSfr1Y2Mjyc6iH9ByJCNBkmW2KKihfW?= =?us-ascii?Q?yq6KXeKkkqhkiPgicjNr4GLZdmhtOuStq51Xlq4C/Q8TSqTKhMXOwM+22OXe?= =?us-ascii?Q?ZwgoW7SRab0LgiDzvwArWMR0wGiSGnmb2DVXg8kVdFd5kfO0AGaewLaISUN2?= =?us-ascii?Q?89SrIGtTN+Ouvsl/nksp5lvxBs0US3Wo2KlAPxySrpA=3D?= x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-OriginatorOrg: logics.de X-MS-Exchange-CrossTenant-AuthAs: Internal X-MS-Exchange-CrossTenant-AuthSource: AM0PR09MB2754.eurprd09.prod.outlook.com X-MS-Exchange-CrossTenant-Network-Message-Id: a935aa34-9966-442b-4d20-08d89991e45b X-MS-Exchange-CrossTenant-originalarrivaltime: 06 Dec 2020 02:51:55.2072 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: fdfa9215-653f-430f-b8ab-a8728140f97a X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: +xFI4oHnFr03l1ZqRXjJD5Py2iaOvb90pORHgeddX8FLjThRJVdPe12PXzeHhJRrxFUeefRShd5phKfBH9Prh2EBbjNY6J3PoJCSNay7VFw= X-MS-Exchange-Transport-CrossTenantHeadersStamped: AM0PR09MB2884 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.150 Adjusted score from AWL reputation of From: address DKIM_SIGNED 0.1 Message has a DKIM or DK signature, not necessarily valid DKIM_VALID -0.1 Message has at least one valid DKIM or DK signature RCVD_IN_DNSWL_NONE -0.0001 Sender listed at https://www.dnswl.org/, no trust RCVD_IN_MSPIKE_H2 -0.001 Average reputation (+2) SPF_HELO_PASS -0.001 SPF: HELO matches SPF record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [restore.rs, lib.rs, proxmox.com, logics99.onmicrosoft.com] Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sun, 06 Dec 2020 02:52:05 -0000 Another update:=20 - Now working with the await.unwrap of future pairs of 2 - Using atomics to make counting of percentage, bytes, zeroes work - Sometimes the program runs and finishes OK.. first impression: performanc= e looks good (about 230 vs 380 seconds (sync) for 32 GiB VM, but can't real= ly measure performance now, server is busy) - But sometimes the program segfaults... Not sure why? Maybe anyone has an = idea? - The more futs I await.unwrap in parallel (see "if futs.len() >=3D N ...")= , the faster/more probable the segfaults occur.=20 diff --git a/Cargo.toml b/Cargo.toml index 7f29d0a..c87bf5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,9 @@ lazy_static =3D "1.4" libc =3D "0.2" once_cell =3D "1.3.1" openssl =3D "0.10" -proxmox =3D { version =3D "0.7.0", features =3D [ "sortable-macro", "api-m= acro" ] } -proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.git= ", tag =3D "v1.0.4" } -#proxmox-backup =3D { path =3D "../proxmox-backup" } +proxmox =3D { version =3D "0.8.0", features =3D [ "sortable-macro", "api-m= acro" ] } +#proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.gi= t", tag =3D "v1.0.4" } +proxmox-backup =3D { path =3D "../proxmox-backup" } serde_json =3D "1.0" tokio =3D { version =3D "0.2.9", features =3D [ "blocking", "fs", "io-util= ", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] } bincode =3D "1.0" diff --git a/src/capi_types.rs b/src/capi_types.rs index 1b9abc1..de08523 100644 --- a/src/capi_types.rs +++ b/src/capi_types.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use std::os::raw::{c_char, c_void, c_int}; +use std::os::raw::{c_uchar, c_char, c_void, c_int}; use std::ffi::CString; =20 pub(crate) struct CallbackPointers { @@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle; /// Opaque handle for backups jobs #[repr(C)] pub struct ProxmoxBackupHandle; + +#[derive(Copy, Clone)] +pub(crate) struct SendRawPointer { + pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> = c_int, + pub callback_data: *mut c_void +} +unsafe impl std::marker::Send for SendRawPointer {} +impl SendRawPointer { + pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) = -> i32 { + return (self.callback)(self.callback_data, offset, data, len); + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b755014..39baddb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image( =20 let archive_name =3D tools::utf8_c_string(archive_name)? .ok_or_else(|| format_err!("archive_name must not be NULL"))?; + =20 + let send_raw_pointer =3D SendRawPointer { callback, callback_data = }; =20 let write_data_callback =3D move |offset: u64, data: &[u8]| { - callback(callback_data, offset, data.as_ptr(), data.len() as u= 64) + return send_raw_pointer.call_itself(offset, data.as_ptr(), dat= a.len() as u64) }; =20 let write_zero_callback =3D move |offset: u64, len: u64| { - callback(callback_data, offset, std::ptr::null(), len) + return send_raw_pointer.call_itself(offset, std::ptr::null(), = len) }; =20 proxmox_backup::tools::runtime::block_on( diff --git a/src/restore.rs b/src/restore.rs index 24983dd..f7aa564 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -66,8 +66,10 @@ impl RestoreTask { let mut builder =3D tokio::runtime::Builder::new(); builder.threaded_scheduler(); builder.enable_all(); - builder.max_threads(6); - builder.core_threads(4); + //builder.max_threads(6); + //builder.core_threads(4); + builder.max_threads(12); + builder.core_threads(6); builder.thread_name("proxmox-restore-worker"); builder }); @@ -106,12 +108,12 @@ impl RestoreTask { pub fn runtime(&self) -> tokio::runtime::Handle { self.runtime.handle().clone() } - - pub async fn restore_image( + =20 + pub async fn restore_image i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> ( &self, archive_name: String, - write_data_callback: impl Fn(u64, &[u8]) -> i32, - write_zero_callback: impl Fn(u64, u64) -> i32, + write_data_callback: A, + write_zero_callback: B, verbose: bool, ) -> Result<(), Error> { =20 @@ -148,46 +150,108 @@ impl RestoreTask { most_used, ); =20 - let mut per =3D 0; - let mut bytes =3D 0; - let mut zeroes =3D 0; + let per =3D std::sync::Arc::new(std::sync::atomic::AtomicUsize::ne= w(0)); + let bytes =3D std::sync::Arc::new(std::sync::atomic::AtomicUsize::= new(0)); + let zeroes =3D std::sync::Arc::new(std::sync::atomic::AtomicUsize:= :new(0)); + =20 + //let mut tokio_handles =3D vec![]; + //let mut futs =3D futures::stream::FuturesUnordered::new(); + //use futures::stream::{self, StreamExt, TryStreamExt}; + use futures::stream::{StreamExt}; + //let futs =3D tokio::stream::iter; + let mut futs =3D futures::stream::FuturesUnordered::new(); + =20 + let index_chunk_size =3D index.chunk_size; + let index_count =3D index.index_count(); + eprintln!("index_count =3D {}, index_chunk_size: {}", index_count,= index_chunk_size); + eprintln!("BEGIN: push and await tasks"); =20 let start_time =3D std::time::Instant::now(); =20 - for pos in 0..index.index_count() { - let digest =3D index.index_digest(pos).unwrap(); - let offset =3D (pos*index.chunk_size) as u64; - if digest =3D=3D &zero_chunk_digest { - let res =3D write_zero_callback(offset, index.chunk_size a= s u64); - if res < 0 { - bail!("write_zero_callback failed ({})", res); - } - bytes +=3D index.chunk_size; - zeroes +=3D index.chunk_size; - } else { - let raw_data =3D ReadChunk::read_chunk(&chunk_reader, &dig= est)?; - let res =3D write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); + for pos in 0..index_count { + let chunk_reader_clone =3D chunk_reader.clone(); + let index_digest =3D index.index_digest(pos).unwrap().clone(); + let per =3D std::sync::Arc::clone(&per); + let bytes =3D std::sync::Arc::clone(&bytes); + let zeroes =3D std::sync::Arc::clone(&zeroes); + futs.push( + tokio::spawn( + async move { + let digest =3D &index_digest; + let offset =3D (pos*index_chunk_size) as u64; + //eprintln!("pos: {}, offset: {}", pos, offset); + if digest =3D=3D &zero_chunk_digest { + let res =3D write_zero_callback(offset, index_= chunk_size as u64); + //eprintln!("write_zero_callback with res: {},= pos: {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_zero_callback failed ({})", r= es); + } + bytes.fetch_add(index_chunk_size, std::sync::a= tomic::Ordering::SeqCst); + zeroes.fetch_add(index_chunk_size, std::sync::= atomic::Ordering::SeqCst); + } else { + //eprintln!("BEFORE read_chunk: pos: {}, offse= t: {}", pos, offset); + //let raw_data =3D ReadChunk::read_chunk(&chun= k_reader, &digest)?; // never finishes reading... + let raw_data =3D AsyncReadChunk::read_chunk(&c= hunk_reader_clone, &digest).await?; + //eprintln!("AFTER read_chunk: pos: {}, offset= : {}", pos, offset); + let res =3D write_data_callback(offset, &raw_d= ata); + //eprintln!("write_data_callback with res: {},= pos: {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_data_callback failed ({})", r= es); + } + bytes.fetch_add(raw_data.len(), std::sync::ato= mic::Ordering::SeqCst); + } + if verbose { + let next_per =3D ((pos+1)*100)/index_count; + let currPer =3D per.load(std::sync::atomic::Or= dering::SeqCst); + let currBytes =3D bytes.load(std::sync::atomic= ::Ordering::SeqCst); + let currZeroes =3D zeroes.load(std::sync::atom= ic::Ordering::SeqCst); + //if per !=3D next_per { + if currPer < next_per { + eprintln!("progress {}% (read {} bytes, ze= roes =3D {}% ({} bytes), duration {} sec)", + next_per, currBytes, + currZeroes*100/currBytes, currZeroes= , + start_time.elapsed().as_secs()); + per.store(next_per, std::sync::atomic::Ord= ering::SeqCst); + } + } + Ok(()) + } + ) + ); + =20 + //if futs.len() >=3D 2 { + if futs.len() >=3D 2 { + let response =3D futs.next().await.unwrap(); + if let Err(e) =3D response { + eprintln!("Error during await: {}", e); } - bytes +=3D raw_data.len(); } - if verbose { - let next_per =3D ((pos+1)*100)/index.index_count(); - if per !=3D next_per { - eprintln!("progress {}% (read {} bytes, zeroes =3D {}%= ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); - per =3D next_per; - } + } + eprintln!("END: push tasks"); + eprintln!("BEGIN: await remaining"); + // Wait for the remaining to finish. + while let Some(response) =3D futs.next().await { + if let Err(e) =3D response { + eprintln!("Error during await: {}", e); } } + eprintln!("END: await remaining"); + + //futs.try_buffer_unordered(20) + //.try_for_each(|_res| futures::future::ok(())) + //.await?; + //if let Err(e) =3D futures::future::try_join_all(tokio_handles).a= wait { + // eprintln!("Error during await: {}", e); + //} + let bytes =3D bytes.load(std::sync::atomic::Ordering::SeqCst); + let zeroes =3D zeroes.load(std::sync::atomic::Ordering::SeqCst); =20 let end_time =3D std::time::Instant::now(); let elapsed =3D end_time.duration_since(start_time); - eprintln!("restore image complete (bytes=3D{}, duration=3D{:.2}s, = speed=3D{:.2}MB/s)", + eprintln!("restore image complete (bytes=3D{}, zeroes=3D{}, durati= on=3D{:.2}s, speed=3D{:.2}MB/s)", bytes, + zeroes, elapsed.as_secs_f64(), bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) );