From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 4F13469E33 for ; Mon, 7 Dec 2020 23:59:56 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3A02029EDD for ; Mon, 7 Dec 2020 23:59:26 +0100 (CET) Received: from EUR05-AM6-obe.outbound.protection.outlook.com (mail-am6eur05on2061e.outbound.protection.outlook.com [IPv6:2a01:111:f400:7e1b::61e]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id D940229ED2 for ; Mon, 7 Dec 2020 23:59:22 +0100 (CET) ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=d0xfyiWMxjgh11jE0PUzVWxuApK7+h7GNmr4yCQWm0cMIIwYjHbQP2j+cOQQiRxwNqsM98U8hKrmLu/u5vdSGPGifwwomti0d4FsLuGladS56Q+mTI2Ic3qbBl9Mjp46dgzmE/igNLyuNcpkqaO3ynm9iOEa2s9vCnmHfn6QfkduDeLV6Iad6D78dhb3QCV/bDaryq4R4Ph2F1zuqfDJMH+oOTYt4AR39JRFdRQvW0WvUv23cXQChjPr+aXYltywrVtaz6Ky3rNsNFQureghemixa9Xp5C0ZBhgZx8wyLcky2QEEEAkojYQBjCLcZjP3wPgUz8ngXaFhON+04byf+A== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=Bbq0yF0YSGy+kStgyQcBGJbDk2Mw4zgBGSSM9tyqYlE=; b=FO4n8IAjGCeNCZcRTja+jtDYa7AXk6Sv1QzJPsOA9Kh61hCZ/CTN93faUo5hlOmZr7lOi7ocigw0KxUfXwBEdJOuFHURjD7SLs9H/vdDjgtXDOlKliN9aEu8HQBvGuR6+k3nF3U0KVwg4KxnbHigMmH1xt6VEdsBjA1AVP4+9PyQTGMzgD/WqcTePX5+bgGYOKaj1fUQSFFOYHfwTHLVg2btlOVYeDmAGZX/7vfmmmpUCRZF6pCHsi2wxQPfKZQNif4JNiB6BS/VvUbx6BN9WgfXSSxNLoCaQ9kh83jNSVbaLgfIBShwk31Kzhrksn1URlspYDrIyDE+Gm5O/ZgzVg== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=logics.de; dmarc=pass action=none header.from=logics.de; dkim=pass header.d=logics.de; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=logics99.onmicrosoft.com; s=selector2-logics99-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=Bbq0yF0YSGy+kStgyQcBGJbDk2Mw4zgBGSSM9tyqYlE=; b=HYtRH1PnJ9SFftE7maJlzGLpwBq1VaIr5b+5JwD6I/f0dUFJ6A63u7C05Tlr6e1xQQXsyNnKzbHCfeRCIo1U4Zw1D5ZbR0FOgokP62OTKozWIM0Em8NtPmnECjYkPmVhaJGxxrg+Q42f1z/aEIeZohwBWdE1Va36XL5LrMyYVSw= Received: from AM0PR09MB2754.eurprd09.prod.outlook.com (2603:10a6:208:12e::18) by AM0PR09MB3939.eurprd09.prod.outlook.com (2603:10a6:208:19d::7) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3632.19; Mon, 7 Dec 2020 22:59:16 +0000 Received: from AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24]) by AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24%3]) with mapi id 15.20.3632.023; Mon, 7 Dec 2020 22:59:15 +0000 From: Niko Fellner To: "pbs-devel@lists.proxmox.com" Thread-Topic: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in Thread-Index: AdbM5XnO9vjioObGQFWOEP3a98u3RA== Date: Mon, 7 Dec 2020 22:59:15 +0000 Message-ID: Accept-Language: de-DE, en-US Content-Language: de-DE X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: lists.proxmox.com; dkim=none (message not signed) header.d=none;lists.proxmox.com; dmarc=none action=none header.from=logics.de; x-originating-ip: [80.81.12.137] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: 8788f2a0-6103-4035-8eea-08d89b03b8c1 x-ms-traffictypediagnostic: AM0PR09MB3939: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:7219; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: A6bAVQra130xwkVAr6iG/ZCo9TBUSICRtgLJf1HFSUzQwb3D/+dm1I7KOpgR1SQju8Jteyvs8eOd9ThnRnim0LlHMiHuXhpwSqDvsEQQjC8eD9ckcZWmLBFoU3KWYJbLSo1BXzxqzannMnG38AxsjV+xY2tC4sDaOJiW32VETs9Ush0pRuxYpvGowMk0CUC/Aly0a40/rwrvbXE081/I+wBjIfq05Ypj75H2b5wXudQ/WhI2+STd4cz3IL5T8oTEUaR/dg3XMSNd1VIOdK9oL0J3W+NgL1Ye+gWWjRb4SVR8COreZ8+qGm8mKkkJcPeq x-forefront-antispam-report: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:AM0PR09MB2754.eurprd09.prod.outlook.com; PTR:; CAT:NONE; SFS:(366004)(376002)(136003)(39830400003)(346002)(396003)(6506007)(8676002)(71200400001)(66556008)(26005)(52536014)(5660300002)(86362001)(478600001)(83380400001)(8936002)(316002)(7696005)(76116006)(55016002)(33656002)(64756008)(66446008)(186003)(6916009)(66946007)(66476007)(2906002)(9686003); DIR:OUT; SFP:1101; x-ms-exchange-antispam-messagedata: =?us-ascii?Q?dmQ0yMXQZiT0E08iRDxJCeYi9+qxm+zWdfh8qJjfkuvTYwBihpFbEnT0kwxX?= =?us-ascii?Q?1fCHBt+l4QU3yxHE6PUboS+8s4JXn5ekisqNSq5BfSIN4jtAAZIBnr+LChb8?= =?us-ascii?Q?QvW2iP4UCILIG4MFanCkU9Zg609dqVKzFir6eSR2Mpm9NgzRpFJvFumiN8PS?= =?us-ascii?Q?QZn68yG/NdHoGrcHJa5abYkUsGrMjCw6H2TAfBTu+6A3jyuvJ4rKqMKh802P?= =?us-ascii?Q?e0NTQY200J2gZ9/Pp9LF5qsYraY/FxN9VDuUaO8IlRewlMrvK6U/U0yvEjQW?= =?us-ascii?Q?sIzOb5ik/q9Di42rsy8eiZKSgzDzqETbeCkbuH92AcVItnDENlnA2lm8EWP2?= =?us-ascii?Q?GE5mU4qyMRe38bb2BribCeCg56fYBfTk1Mtr4CNuWxKSJPVDpnn3+eKqb8SS?= =?us-ascii?Q?jsXE/9db+DrY4uniEz28Ql/Eaow3P69pRsm//RWZ3/8CHGyL9ARwE1rsTS/K?= =?us-ascii?Q?wMW17jqHiNiIE+42OvAHRsEiCoeru1788+znMHUENCPPVzRsFf+TSE9u4C2U?= =?us-ascii?Q?JCzLwobMLajv1x8Fbm5iDzqlLuoIMm2E7xcSCZMrU0EEOvUw2uguDPdJjD1A?= =?us-ascii?Q?NQ9EMr6ch+zNBjEEOjuPdbeURRb6WVHpKHhVm6U3oXJEYGh6Mhl0vVMxM+J7?= =?us-ascii?Q?4XBPbIv0a6EyoHBIqsw61cd0YGGUF+s4GMyZ00nO7/2I9nDkqTwAOI2I+BOp?= =?us-ascii?Q?AgBTRios7ShurE3KjgXYzKBm01772911OUTV3mqJrN4++D/FaPX7jV/569At?= =?us-ascii?Q?SZU/vMZlne5NvsWQM2M8qTC1OBzawC2bP5k3cVhkanCGag818bIu7cY2F42w?= =?us-ascii?Q?uRYem6t5pUTrHkHRZAyOY3yGP4tUdPnXOgQloMs7dixkd/xznw81iIeuXOCo?= =?us-ascii?Q?LOQTQswEURkDDifkEFM9wfbo2T0qGyJK5wz41ogx8McGZPx+Cro3fKGgNab4?= =?us-ascii?Q?fhdL3cBMbZN6s/Bpvsn6h96r59m6DrSH2b2wqaBRSCI=3D?= x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-OriginatorOrg: logics.de X-MS-Exchange-CrossTenant-AuthAs: Internal X-MS-Exchange-CrossTenant-AuthSource: AM0PR09MB2754.eurprd09.prod.outlook.com X-MS-Exchange-CrossTenant-Network-Message-Id: 8788f2a0-6103-4035-8eea-08d89b03b8c1 X-MS-Exchange-CrossTenant-originalarrivaltime: 07 Dec 2020 22:59:15.8895 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: fdfa9215-653f-430f-b8ab-a8728140f97a X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: ws48pnohfuptVxxZd0SVCSQa8BRKuvqZJyDM8E8YrhTzY3+5dsWFzt+3jnDhQNFWQkb/gxl2chM+6tFMaOwEKDuQibtcn58vAzQGrOXZqi4= X-MS-Exchange-Transport-CrossTenantHeadersStamped: AM0PR09MB3939 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.075 Adjusted score from AWL reputation of From: address DKIM_SIGNED 0.1 Message has a DKIM or DK signature, not necessarily valid DKIM_VALID -0.1 Message has at least one valid DKIM or DK signature RCVD_IN_DNSWL_NONE -0.0001 Sender listed at https://www.dnswl.org/, no trust SPF_HELO_PASS -0.001 SPF: HELO matches SPF record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 07 Dec 2020 22:59:56 -0000 Hi Dominik! Thanks for your quick feedback. Indeed, I have never done Rust before and only have a limited background in= parallel programming. Thanks a lot for your help! Anyhow, yes, your code as is performs even worse than the original, no matt= er how many threads you throw at it. - Your code, with activated mutex: restore image complete (bytes=3D34359738= 368, duration=3D235.59s, speed=3D139.09MB/s) - Original sync code: restore image complete (bytes=3D34359738368, duratio= n=3D224.78s, speed=3D145.78MB/s) But there is an easy fix for it!=20 +// let _guard =3D mutex.lock().unwrap(); + let raw_data =3D=20 ReadChunk::read_chunk(&chunk_reader, &digest)?; Put your mutex here just below the line of the read_chunk, and you won't sy= nchronize on the reads anymore. Please run your restore performance test again, and try out if it performs = faster now, for you too!=20 In my benchmark (32 GB VM), your solution with this small fix ran faster th= an my code (best I had was "duration=3D153.03s, speed=3D214.13MB/s") - 4 threads: restore image complete (bytes=3D34359738368, duration=3D154.67= s, speed=3D211.86MB/s) - 12 threads: restore image complete (bytes=3D34359738368, duration=3D144.5= 8s, speed=3D226.65MB/s) - 12 threads: restore image complete (bytes=3D34359738368, duration=3D143.4= 1s, speed=3D228.49MB/s) (just another run, to verify it)=20 Therefore the bottleneck of single threaded CPU performance is removed to a= certain degree already, even without the parallel writes. On my machine it= ran about 57% faster than the original sync code, as you can see.=20 Yes, I think we should ask the qemu guys about it. Maybe they can even prov= ide a fix - who knows. I'll also benchmark an Azure VM, to check out the speedup with a big number= of CPU threads and NVMe disks.=20 By the way: I also benchmarked a mix of your lib.rs with my restore.rs "pub= async fn restore_image": - 12 threads: restore image complete (bytes=3D34359738368, zeroes=3D2232208= 5888, duration=3D155.07s, speed=3D211.31MB/s) - 12 threads: restore image complete (bytes=3D34359738368, zeroes=3D2232208= 5888, duration=3D156.37s, speed=3D209.56MB/s) (just another run, to verify = it) Therefore your code really is superior (maybe it was the std::sync::Mutex i= nstead of my thread::yield, or something different), and it would be great = if you could try it out again. Here I automatically set the number of CPUs to use: diff --git a/Cargo.toml b/Cargo.toml index 7f29d0a..4b42e02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,10 @@ lazy_static =3D "1.4" libc =3D "0.2" once_cell =3D "1.3.1" openssl =3D "0.10" -proxmox =3D { version =3D "0.7.0", features =3D [ "sortable-macro", "api-m= acro" ] } -proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.git= ", tag =3D "v1.0.4" } -#proxmox-backup =3D { path =3D "../proxmox-backup" } +proxmox =3D { version =3D "0.8.0", features =3D [ "sortable-macro", "api-m= acro" ] } +#proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.gi= t", tag =3D "v1.0.4" } +proxmox-backup =3D { path =3D "../proxmox-backup" } serde_json =3D "1.0" tokio =3D { version =3D "0.2.9", features =3D [ "blocking", "fs", "io-util= ", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] } bincode =3D "1.0" +num_cpus =3D "1.13.0" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b755014..dee952e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -809,6 +809,14 @@ pub extern "C" fn proxmox_restore_image( error: * mut * mut c_char, verbose: bool, ) -> c_int { + =20 + #[derive(Clone,Copy)] + struct SafePointer { + pointer: *mut c_void, + } + + unsafe impl Send for SafePointer {}; + unsafe impl Sync for SafePointer {}; =20 let restore_task =3D restore_handle_to_task(handle); =20 @@ -817,12 +825,13 @@ pub extern "C" fn proxmox_restore_image( let archive_name =3D tools::utf8_c_string(archive_name)? .ok_or_else(|| format_err!("archive_name must not be NULL"))?; =20 + let foo =3D SafePointer { pointer: callback_data.clone() }; let write_data_callback =3D move |offset: u64, data: &[u8]| { - callback(callback_data, offset, data.as_ptr(), data.len() as u= 64) + callback(foo.pointer, offset, data.as_ptr(), data.len() as u64= ) }; =20 let write_zero_callback =3D move |offset: u64, len: u64| { - callback(callback_data, offset, std::ptr::null(), len) + callback(foo.pointer, offset, std::ptr::null(), len) }; =20 proxmox_backup::tools::runtime::block_on( diff --git a/src/restore.rs b/src/restore.rs index 24983dd..168853b 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -8,6 +8,7 @@ use tokio::runtime::Runtime; use tokio::prelude::*; =20 use proxmox_backup::tools::runtime::get_runtime_with_builder; +use proxmox_backup::tools::ParallelHandler; use proxmox_backup::backup::*; use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, = RemoteChunkReader}; =20 @@ -62,12 +63,14 @@ impl RestoreTask { } =20 pub fn new(setup: BackupSetup) -> Result { + let vcpus =3D num_cpus::get(); + eprintln!("{} vCPUs detected", vcpus); let runtime =3D get_runtime_with_builder(|| { let mut builder =3D tokio::runtime::Builder::new(); builder.threaded_scheduler(); builder.enable_all(); - builder.max_threads(6); - builder.core_threads(4); + builder.max_threads(2 * vcpus); + builder.core_threads(vcpus); builder.thread_name("proxmox-restore-worker"); builder }); @@ -110,8 +113,8 @@ impl RestoreTask { pub async fn restore_image( &self, archive_name: String, - write_data_callback: impl Fn(u64, &[u8]) -> i32, - write_zero_callback: impl Fn(u64, u64) -> i32, + write_data_callback: impl Fn(u64, &[u8]) -> i32 + Send + Copy + 's= tatic, + write_zero_callback: impl Fn(u64, u64) -> i32 + Send + Copy + 'sta= tic, verbose: bool, ) -> Result<(), Error> { =20 @@ -147,52 +150,96 @@ impl RestoreTask { file_info.chunk_crypt_mode(), most_used, ); - - let mut per =3D 0; - let mut bytes =3D 0; - let mut zeroes =3D 0; + =20 + let vcpus =3D num_cpus::get(); =20 let start_time =3D std::time::Instant::now(); =20 - for pos in 0..index.index_count() { - let digest =3D index.index_digest(pos).unwrap(); - let offset =3D (pos*index.chunk_size) as u64; - if digest =3D=3D &zero_chunk_digest { - let res =3D write_zero_callback(offset, index.chunk_size a= s u64); - if res < 0 { - bail!("write_zero_callback failed ({})", res); - } - bytes +=3D index.chunk_size; - zeroes +=3D index.chunk_size; - } else { - let raw_data =3D ReadChunk::read_chunk(&chunk_reader, &dig= est)?; - let res =3D write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes +=3D raw_data.len(); + let (sender, mut receiver) =3D tokio::sync::mpsc::unbounded_channe= l(); + + let mutex =3D Arc::new(Mutex::new(())); + + let index_count =3D index.index_count(); + let pool =3D ParallelHandler::new( + "restore", vcpus, + move |(digest, offset, size): ([u8;32], u64, u64)| { + let mutex =3D mutex.clone(); + let chunk_reader =3D chunk_reader.clone(); + let (bytes, zeroes) =3D if digest =3D=3D zero_chunk_digest= { + { + let _guard =3D mutex.lock().unwrap(); + let res =3D write_zero_callback(offset, size); + if res < 0 { + bail!("write_zero_callback failed ({})", res); + } + } + (size, size) + } else { + let size =3D { + //let _guard =3D mutex.lock().unwrap(); // we don'= t want to sync too early here + let raw_data =3D ReadChunk::read_chunk(&chunk_read= er, &digest)?; + let _guard =3D mutex.lock().unwrap(); + let res =3D write_data_callback(offset, &raw_data)= ; + if res < 0 { + bail!("write_data_callback failed ({})", res); + } + raw_data.len() as u64 + }; + (size, 0) + }; + + sender.send((bytes, zeroes))?; + + Ok(()) } - if verbose { - let next_per =3D ((pos+1)*100)/index.index_count(); - if per !=3D next_per { - eprintln!("progress {}% (read {} bytes, zeroes =3D {}%= ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); - per =3D next_per; + ); + + let channel =3D pool.channel(); + + let output =3D tokio::spawn(async move { + let mut count =3D 0; + let mut per =3D 0; + let mut bytes =3D 0; + let mut zeroes =3D 0; + while let Some((new_bytes, new_zeroes)) =3D receiver.recv().aw= ait { + bytes +=3D new_bytes; + zeroes +=3D new_zeroes; + count +=3D 1; + if verbose { + let next_per =3D ((count)*100)/index_count; + if per !=3D next_per { + eprintln!("progress {}% (read {} bytes, zeroes =3D= {}% ({} bytes), duration {} sec)", + next_per, bytes, + zeroes*100/bytes, zeroes, + start_time.elapsed().as_secs()); + per =3D next_per; + } + } + if count >=3D index_count { + break; } } - } =20 - let end_time =3D std::time::Instant::now(); - let elapsed =3D end_time.duration_since(start_time); - eprintln!("restore image complete (bytes=3D{}, duration=3D{:.2}s, = speed=3D{:.2}MB/s)", - bytes, - elapsed.as_secs_f64(), - bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) - ); + let end_time =3D std::time::Instant::now(); + let elapsed =3D end_time.duration_since(start_time); + eprintln!("restore image complete (bytes=3D{}, duration=3D{:.2= }s, speed=3D{:.2}MB/s)", + bytes, + elapsed.as_secs_f64(), + bytes as f64/(1024.0*1024.0*elapsed.as_secs_f64()) + ); + + Ok::<_, Error>(()) + }); + + for pos in 0..index.index_count() { + let digest =3D index.index_digest(pos).unwrap().clone(); + let offset =3D (pos*index.chunk_size) as u64; + let chunk_size =3D index.chunk_size; + + proxmox_backup::tools::runtime::block_in_place(|| channel.send= ((digest, offset, chunk_size as u64)))?; + } =20 - Ok(()) + output.await? } =20 pub fn get_image_length(&self, aid: u8) -> Result {