From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1FDE8691B0 for ; Sun, 6 Dec 2020 00:40:31 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 09C3D16839 for ; Sun, 6 Dec 2020 00:40:01 +0100 (CET) Received: from EUR01-DB5-obe.outbound.protection.outlook.com (mail-eopbgr150074.outbound.protection.outlook.com [40.107.15.74]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id A775916830 for ; Sun, 6 Dec 2020 00:39:59 +0100 (CET) ARC-Seal: i=1; a=rsa-sha256; s=arcselector9901; d=microsoft.com; cv=none; b=SynoJrQKobVMRgJa26Z/+Tzri7j5x3E98usP9kzCLN55XjHVZBbreLlkKsZtJVrw8ftwfxCMHEMHYjGjdLosvkAyJDu4ahrc7w6JUvA1pP+nxA49jqxpiebbnwJBVpwU/kZNekKllVvNH7GEWHgklThd9UwshIhp5bwu8B5l/sS7WiJ4euiDtqXwmtRc/ADyLbAV4TZUSnhVjCzJQRtDR+4Mj/CanRR9L59ZtDhI76K0/aDzL5ZdBDyoF5wZ+DKdZlgjo0ScTUfLOw1lagSCrtYzdRo0/mAl9TEpA7/gz1rCmNlumhIFIdAcODCW07nDJgK/zvP1rTQiJZp7WsPTPw== ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=microsoft.com; s=arcselector9901; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=05WEAH2YpDQP6KaumryQyM3qoJWBN0PsezfM4ulfRyQ=; b=i1T2ts3QVbsPSy+1ZFUXhJsE9H4zArhdHBPHoFG8hq9/C3UsnXEdTmf/q08Z1PruqIO2OPPvaTjy6sWzTxhq4lzQNeYqg2r9FsttluUsR5piw6akQl0mmhdkZKuuld2xYmP6Gcnk/rFAD1KnBqC0irRT9CsvUviScb+CxOOFmW6V+qwYwYVCNSyNvoVjeZ8li3UD709NRjBpPZJw9trDRIguebh3nrQv14vIeFTItOB0lXoSFp0P+khHxybVVfBr+Tiy9xBfc2SYUxvXs+1u6O/tq2GRidMcgZHdHiLCm2imCbfQb/IhUGLOyzS8SjXqHZTCnpYdOtJ71GwYDO1Qow== ARC-Authentication-Results: i=1; mx.microsoft.com 1; spf=pass smtp.mailfrom=logics.de; dmarc=pass action=none header.from=logics.de; dkim=pass header.d=logics.de; arc=none DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=logics99.onmicrosoft.com; s=selector2-logics99-onmicrosoft-com; h=From:Date:Subject:Message-ID:Content-Type:MIME-Version:X-MS-Exchange-SenderADCheck; bh=05WEAH2YpDQP6KaumryQyM3qoJWBN0PsezfM4ulfRyQ=; b=PMPeaTP5KjhezV5XvwY6f+jMbG3zHTutRJWwVUm3aYzvt6OfaD96A/vBUd1HnFCq57QUM9GBvwSp10V1wG07/XoJRqMX0AoNofdw3YlqOeMMSHlq8eNZmslF5VOTRkEuje6ijraxSK3+8NUTYbnJnTIxZuBuQBgoWo7Gwv17Dis= Received: from AM0PR09MB2754.eurprd09.prod.outlook.com (2603:10a6:208:12e::18) by AM9PR09MB4836.eurprd09.prod.outlook.com (2603:10a6:20b:285::20) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.20.3632.17; Sat, 5 Dec 2020 23:39:52 +0000 Received: from AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24]) by AM0PR09MB2754.eurprd09.prod.outlook.com ([fe80::68c5:8966:920c:8d24%3]) with mapi id 15.20.3632.022; Sat, 5 Dec 2020 23:39:52 +0000 From: Niko Fellner To: "pbs-devel@lists.proxmox.com" Thread-Topic: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move Thread-Index: AdbLUR6utuB07ZEZR+ahAxbJcGL37w== Date: Sat, 5 Dec 2020 23:39:51 +0000 Message-ID: Accept-Language: de-DE, en-US Content-Language: de-DE X-MS-Has-Attach: X-MS-TNEF-Correlator: authentication-results: lists.proxmox.com; dkim=none (message not signed) header.d=none;lists.proxmox.com; dmarc=none action=none header.from=logics.de; x-originating-ip: [80.81.12.137] x-ms-publictraffictype: Email x-ms-office365-filtering-correlation-id: 1ffb9b37-f5ac-4752-5e51-08d899770fef x-ms-traffictypediagnostic: AM9PR09MB4836: x-microsoft-antispam-prvs: x-ms-oob-tlc-oobclassifiers: OLM:4714; x-ms-exchange-senderadcheck: 1 x-microsoft-antispam: BCL:0; x-microsoft-antispam-message-info: /oUrywUd1MCU3uRdDPu33cEK8vhyqq7GDbCHbAmTrBPUmV0nvRtnIDR6D+hpl992hGLUZ/HugKdzoK4TZmLNBWFr3WtwrLhS2hd5Ol1R7P08Ia3Y04CsqWGkb6ZobDntclixyHRniA8Kn0wVnYbLlyyZ/jXrpx8XqCGCqUiXkNfF4XllPuHZ1LPgBJ9VSFN6eq7qxhLatrjK+v7dPKUpJWHhcVtJd7Rol9hCznu5kJi5cLTmZY9H3x4+cDNteGkyHSpOIBkVc3KmfX2H13BGZmoackMkYqUeEHUsyIeaBCqYOqztBJwB88qYy4JhYqp1 x-forefront-antispam-report: CIP:255.255.255.255; CTRY:; LANG:en; SCL:1; SRV:; IPV:NLI; SFV:NSPM; H:AM0PR09MB2754.eurprd09.prod.outlook.com; PTR:; CAT:NONE; SFS:(39830400003)(346002)(366004)(376002)(136003)(396003)(55016002)(6506007)(186003)(316002)(8936002)(26005)(9686003)(71200400001)(6916009)(33656002)(478600001)(7696005)(30864003)(83380400001)(2906002)(52536014)(66476007)(5660300002)(76116006)(64756008)(66556008)(66574015)(66446008)(86362001)(66946007); DIR:OUT; SFP:1101; x-ms-exchange-antispam-messagedata: =?iso-8859-1?Q?lPHoSqCtTN5U8dsD4gFnJmLG/SNHN+kPALblx4gM3yn83v8MIBTAAkgKiL?= =?iso-8859-1?Q?HHe0P6/Y2jjXpqY+rchFF1vrTjA+x36IRxQspfqIIrg3hl6VYYtfv6C1j+?= =?iso-8859-1?Q?vc7kQXa8Fl4deqEWRYIWLOnV/zlbbNJIr4vhqHjncwHEn6IQ7PgoPu0dXI?= =?iso-8859-1?Q?0cwCCkAu6AaOGDAEsT5r3vImw7K9eWxh664vi7dPE+Kj7vt+Dh3sKyZGju?= =?iso-8859-1?Q?HQFvm4JgLLQ7Eo7Y67sviBfk9lEGrwgU/1VWWE7FasWKZU2FHWbbgI7Tde?= =?iso-8859-1?Q?3OcVluMhpVzEb/j0wZmsLyfPzwhNMZatYjO9OQi9ScoPSxXa5zvFFqIy6Z?= =?iso-8859-1?Q?EhaPDMHcR34Os96daeY16H1OhofCC9N+DXIrh5y74sZ024MtUrrXjbjndv?= =?iso-8859-1?Q?zNpQgsl9BNbFeMGu1iv3YkPxJgj/HKjx08mHc3CPHAvY0/ucLUdV8YjQCj?= =?iso-8859-1?Q?Vjb5N13IVsLwnT3tJc3YYPPErkbKayiKg9LWvIZCdQnv/Pm9BwBbi3TE7z?= =?iso-8859-1?Q?zkaIwzRGqPNW7ZwvubiTzQ8wXwI45lwHXZ8n/mHVfrBRSxXbvlGnM/k4l2?= =?iso-8859-1?Q?AAzeufS7DDm4TdiYMf0/vU2tJ4N0hKR4OxCdeUImkZHTh9kNLhaXcuEl1l?= =?iso-8859-1?Q?tn/vijum2bRU/SPz3MQ1r1M1V17QO/s3z4fP6KLYKuD8ABRqSF/2q70JTs?= =?iso-8859-1?Q?f8x1tuy7JoGfgCyNdbaZPFwg7ZPrlXtnqxxym8s3TF6NWdYmUPvWppgQer?= =?iso-8859-1?Q?9HBzpJaFLenguLRta1xSFk//vGToNvU3wwoz1I+2Jjv51S0/l8HBj0zKP6?= =?iso-8859-1?Q?qho5IYYQ+xsOZwU+mVLCsEaXWIDhnjt9khzTHbLwXAmJDgETF9EQIjOG2h?= =?iso-8859-1?Q?bw6b+m0TAMJP8NsHetXELtfyjpVE6wwpkQaBhK8tpFf6+5zAxZ+C80Xpx7?= =?iso-8859-1?Q?SCaW6ScBd1BO4DmjbsunxBXyFgxJaxo9D9CWzb8+lHv2gwfK3riBm3dXzw?= =?iso-8859-1?Q?GREZ1Zzj++6B358GU=3D?= x-ms-exchange-transport-forked: True Content-Type: text/plain; charset="iso-8859-1" Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 X-OriginatorOrg: logics.de X-MS-Exchange-CrossTenant-AuthAs: Internal X-MS-Exchange-CrossTenant-AuthSource: AM0PR09MB2754.eurprd09.prod.outlook.com X-MS-Exchange-CrossTenant-Network-Message-Id: 1ffb9b37-f5ac-4752-5e51-08d899770fef X-MS-Exchange-CrossTenant-originalarrivaltime: 05 Dec 2020 23:39:51.9878 (UTC) X-MS-Exchange-CrossTenant-fromentityheader: Hosted X-MS-Exchange-CrossTenant-id: fdfa9215-653f-430f-b8ab-a8728140f97a X-MS-Exchange-CrossTenant-mailboxtype: HOSTED X-MS-Exchange-CrossTenant-userprincipalname: YO+gcX7UP5xJrJGpHEfc7ksunMx2B7+5zua7DE57/b46X5Xpgu99M/7JrBC353WTy4ajIXbnpkGmK37BlTGX3RVdklTIM4DlST9LfQZr5ww= X-MS-Exchange-Transport-CrossTenantHeadersStamped: AM9PR09MB4836 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.400 Adjusted score from AWL reputation of From: address DKIM_SIGNED 0.1 Message has a DKIM or DK signature, not necessarily valid DKIM_VALID -0.1 Message has at least one valid DKIM or DK signature RCVD_IN_DNSWL_LOW -0.7 Sender listed at https://www.dnswl.org/, low trust RCVD_IN_MSPIKE_H2 -0.001 Average reputation (+2) SPF_HELO_PASS -0.001 SPF: HELO matches SPF record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox.com, pull.rs, logics99.onmicrosoft.com, restore.rs, lib.rs] Subject: Re: [pbs-devel] parallelize restore.rs fn restore_image: problems in async move X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sat, 05 Dec 2020 23:40:31 -0000 Update: I was able to implement an unsafe SendRawPointer, but still have pr= oblems with the parallelization.=20 I have two different versions: Version ASYNC_CRASH: - push all tasks into Vector tokio_handles and call try_join_all(tokio_hand= les).await later on - It runs in parallel, but quickly leads to an out of memory error, because= my implementation does all tasks once, instead of (number of CPU cores) ta= sks... - Syslog even showed a segfault: > Dec 5 22:50:49 pve kernel: [13437.190348] proxmox-restore[26081]: segfau= lt at 8 ip 000055643d435b37 sp 00007f94d0f78c20 error 4 in pbs-restore[5564= 3d34c000+104000] > Dec 5 22:50:49 pve kernel: [13437.190357] Code: 48 85 ff 75 b2 48 8b 4c = 24 28 64 48 33 0c 25 28 00 00 00 44 89 e8 75 43 48 83 c4 38 5b 5d 41 5c 41 = 5d c3 48 8b 85 b0 00 00 00 <48> 8b 50 08 48 89 95 b0 00 00 00 48 85 d2 74 1= 1 48 c7 40 08 00 00 - Is there a way to call just a group of maybe 5 or 10 tokio handles? Or wh= at am I doing wrong here in Version ASYNC_CRASH? Version ASYNC_SINGLE:=20 - use the async functions, but directly await after spawning the tokio task= .=20 - Restore works, it looks good - But no parallelization - Will do some performance tests tomorrow of ASYNC_SINGLE vs. the original = version. Original version: commit 447552da4af1c7f0553873e4fd21335dab8fe029 (HEAD -> master, origin/mas= ter, origin/HEAD) Author: Fabian Gr=FCnbichler Date: Mon Nov 30 13:41:45 2020 +0100 Maybe ASYNC_CRASH crashes because of how I use the chunk reader? - I couldn't use "ReadChunk::read_chunk(&chunk_reader_clone, &digest)?",= because that one never finished reading in my async block...?! - So I tried with "AsyncReadChunk::read_chunk(&chunk_reader_clone, &dige= st).await?" which works fine in Version ASYNC_SINGLE, but within ASYNC_CRAS= H it leads to chaos. @Dietmar: unfortunately I still couldn't use much of pull.rs - I still don'= t understand it well enough. VERSION ASYNC_CRASH: diff --git a/Cargo.toml b/Cargo.toml index 7f29d0a..c87bf5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,9 @@ lazy_static =3D "1.4" libc =3D "0.2" once_cell =3D "1.3.1" openssl =3D "0.10" -proxmox =3D { version =3D "0.7.0", features =3D [ "sortable-macro", "api-m= acro" ] } -proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.git= ", tag =3D "v1.0.4" } -#proxmox-backup =3D { path =3D "../proxmox-backup" } +proxmox =3D { version =3D "0.8.0", features =3D [ "sortable-macro", "api-m= acro" ] } +#proxmox-backup =3D { git =3D "git://git.proxmox.com/git/proxmox-backup.gi= t", tag =3D "v1.0.4" } +proxmox-backup =3D { path =3D "../proxmox-backup" } serde_json =3D "1.0" tokio =3D { version =3D "0.2.9", features =3D [ "blocking", "fs", "io-util= ", "macros", "rt-threaded", "signal", "stream", "tcp", "time", "uds" ] } bincode =3D "1.0" diff --git a/src/capi_types.rs b/src/capi_types.rs index 1b9abc1..de08523 100644 --- a/src/capi_types.rs +++ b/src/capi_types.rs @@ -1,5 +1,5 @@ use anyhow::Error; -use std::os::raw::{c_char, c_void, c_int}; +use std::os::raw::{c_uchar, c_char, c_void, c_int}; use std::ffi::CString; =20 pub(crate) struct CallbackPointers { @@ -48,3 +48,15 @@ pub struct ProxmoxRestoreHandle; /// Opaque handle for backups jobs #[repr(C)] pub struct ProxmoxBackupHandle; + +#[derive(Copy, Clone)] +pub(crate) struct SendRawPointer { + pub callback: extern "C" fn(*mut c_void, u64, *const c_uchar, u64) -> = c_int, + pub callback_data: *mut c_void +} +unsafe impl std::marker::Send for SendRawPointer {} +impl SendRawPointer { + pub fn call_itself(self, offset: u64, data: *const c_uchar, len: u64) = -> i32 { + return (self.callback)(self.callback_data, offset, data, len); + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b755014..39baddb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -816,13 +816,15 @@ pub extern "C" fn proxmox_restore_image( =20 let archive_name =3D tools::utf8_c_string(archive_name)? .ok_or_else(|| format_err!("archive_name must not be NULL"))?; + =20 + let send_raw_pointer =3D SendRawPointer { callback, callback_data = }; =20 let write_data_callback =3D move |offset: u64, data: &[u8]| { - callback(callback_data, offset, data.as_ptr(), data.len() as u= 64) + return send_raw_pointer.call_itself(offset, data.as_ptr(), dat= a.len() as u64) }; =20 let write_zero_callback =3D move |offset: u64, len: u64| { - callback(callback_data, offset, std::ptr::null(), len) + return send_raw_pointer.call_itself(offset, std::ptr::null(), = len) }; =20 proxmox_backup::tools::runtime::block_on( diff --git a/src/restore.rs b/src/restore.rs index 24983dd..c0f0bf8 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -106,12 +106,12 @@ impl RestoreTask { pub fn runtime(&self) -> tokio::runtime::Handle { self.runtime.handle().clone() } - - pub async fn restore_image( + =20 + pub async fn restore_image i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> ( &self, archive_name: String, - write_data_callback: impl Fn(u64, &[u8]) -> i32, - write_zero_callback: impl Fn(u64, u64) -> i32, + write_data_callback: A, + write_zero_callback: B, verbose: bool, ) -> Result<(), Error> { =20 @@ -151,38 +151,66 @@ impl RestoreTask { let mut per =3D 0; let mut bytes =3D 0; let mut zeroes =3D 0; + =20 + let mut tokio_handles =3D vec![]; + let index_chunk_size =3D index.chunk_size; + let index_count =3D index.index_count(); + eprintln!("index_count =3D {}, index_chunk_size: {}", index_count,= index_chunk_size); + eprintln!("BEGIN: push tasks"); =20 let start_time =3D std::time::Instant::now(); =20 - for pos in 0..index.index_count() { - let digest =3D index.index_digest(pos).unwrap(); - let offset =3D (pos*index.chunk_size) as u64; - if digest =3D=3D &zero_chunk_digest { - let res =3D write_zero_callback(offset, index.chunk_size a= s u64); - if res < 0 { - bail!("write_zero_callback failed ({})", res); - } - bytes +=3D index.chunk_size; - zeroes +=3D index.chunk_size; - } else { - let raw_data =3D ReadChunk::read_chunk(&chunk_reader, &dig= est)?; - let res =3D write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes +=3D raw_data.len(); - } - if verbose { - let next_per =3D ((pos+1)*100)/index.index_count(); - if per !=3D next_per { - eprintln!("progress {}% (read {} bytes, zeroes =3D {}%= ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); - per =3D next_per; - } - } + for pos in 0..index_count { + let chunk_reader_clone =3D chunk_reader.clone(); + let index_digest =3D index.index_digest(pos).unwrap().clone(); + tokio_handles.push( + tokio::spawn( + async move { + let digest =3D &index_digest; + let offset =3D (pos*index_chunk_size) as u64; + //eprintln!("pos: {}, offset: {}", pos, offset); + if digest =3D=3D &zero_chunk_digest { + let res =3D write_zero_callback(offset, index_= chunk_size as u64); + //eprintln!("write_zero_callback with res: {},= pos: {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_zero_callback failed ({})", r= es); + } + bytes +=3D index_chunk_size; + zeroes +=3D index_chunk_size; + } else { + //eprintln!("BEFORE read_chunk: pos: {}, offse= t: {}", pos, offset); + //let raw_data =3D ReadChunk::read_chunk(&chun= k_reader, &digest)?; // never finishes reading... + let raw_data =3D AsyncReadChunk::read_chunk(&c= hunk_reader_clone, &digest).await?; + //eprintln!("AFTER read_chunk: pos: {}, offset= : {}", pos, offset); + let res =3D write_data_callback(offset, &raw_d= ata); + //eprintln!("write_data_callback with res: {},= pos: {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_data_callback failed ({})", r= es); + } + bytes +=3D raw_data.len(); + } + if verbose { + let next_per =3D ((pos+1)*100)/index_count; + //if per !=3D next_per { + if per < next_per { + eprintln!("progress {}% (read {} bytes, ze= roes =3D {}% ({} bytes), duration {} sec)", + next_per, bytes, + zeroes*100/bytes, zeroes, + start_time.elapsed().as_secs()); + per =3D next_per; + } + } + Ok(()) + } + ) + ); + } + eprintln!("END: push tasks"); + eprintln!("BEGIN: await"); + if let Err(e) =3D futures::future::try_join_all(tokio_handles).awa= it { + eprintln!("Error during await: {}", e); } + eprintln!("END: await"); =20 let end_time =3D std::time::Instant::now(); let elapsed =3D end_time.duration_since(start_time); VERSION ASYNC_SINGLE: diff --git a/src/restore.rs b/src/restore.rs index 24983dd..9d4fb4d 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -106,12 +106,12 @@ impl RestoreTask { pub fn runtime(&self) -> tokio::runtime::Handle { self.runtime.handle().clone() } - - pub async fn restore_image( + =20 + pub async fn restore_image i32, B: 'static + Copy + Send + Fn(u64, u64) -> i32> ( &self, archive_name: String, - write_data_callback: impl Fn(u64, &[u8]) -> i32, - write_zero_callback: impl Fn(u64, u64) -> i32, + write_data_callback: A, + write_zero_callback: B, verbose: bool, ) -> Result<(), Error> { =20 @@ -148,39 +148,45 @@ impl RestoreTask { most_used, ); =20 - let mut per =3D 0; let mut bytes =3D 0; - let mut zeroes =3D 0; + =20 + let index_chunk_size =3D index.chunk_size; + let index_count =3D index.index_count(); + eprintln!("index_count =3D {}, index_chunk_size: {}", index_count,= index_chunk_size); =20 let start_time =3D std::time::Instant::now(); =20 - for pos in 0..index.index_count() { - let digest =3D index.index_digest(pos).unwrap(); - let offset =3D (pos*index.chunk_size) as u64; - if digest =3D=3D &zero_chunk_digest { - let res =3D write_zero_callback(offset, index.chunk_size a= s u64); - if res < 0 { - bail!("write_zero_callback failed ({})", res); - } - bytes +=3D index.chunk_size; - zeroes +=3D index.chunk_size; - } else { - let raw_data =3D ReadChunk::read_chunk(&chunk_reader, &dig= est)?; - let res =3D write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes +=3D raw_data.len(); - } - if verbose { - let next_per =3D ((pos+1)*100)/index.index_count(); - if per !=3D next_per { - eprintln!("progress {}% (read {} bytes, zeroes =3D {}%= ({} bytes), duration {} sec)", - next_per, bytes, - zeroes*100/bytes, zeroes, - start_time.elapsed().as_secs()); - per =3D next_per; + for pos in 0..index_count { + let chunk_reader_clone =3D chunk_reader.clone(); + let index_digest =3D index.index_digest(pos).unwrap().clone(); + if let Err(e) =3D tokio::spawn( + async move { + let digest =3D &index_digest; + let offset =3D (pos*index_chunk_size) as u64; + //eprintln!("pos: {}, offset: {}", pos, offset); + if digest =3D=3D &zero_chunk_digest { + let res =3D write_zero_callback(offset, index_chun= k_size as u64); + //eprintln!("write_zero_callback with res: {}, pos= : {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_zero_callback failed ({})", res); + } + bytes +=3D index_chunk_size; + } else { + //eprintln!("BEFORE read_chunk: pos: {}, offset: {= }", pos, offset); + //let raw_data =3D ReadChunk::read_chunk(&chunk_re= ader, &digest)?; // never finishes reading... + let raw_data =3D AsyncReadChunk::read_chunk(&chunk= _reader_clone, &digest).await?; + //eprintln!("AFTER read_chunk: pos: {}, offset: {}= ", pos, offset); + let res =3D write_data_callback(offset, &raw_data)= ; + //eprintln!("write_data_callback with res: {}, pos= : {}, offset: {}", res, pos, offset); + if res < 0 { + bail!("write_data_callback failed ({})", res); + } + bytes +=3D raw_data.len(); + } + Ok(()) } + ).await { + eprintln!("Error during await: {}", e); } }