From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1E45362828 for ; Wed, 30 Sep 2020 16:16:40 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 145AC1A2EC for ; Wed, 30 Sep 2020 16:16:10 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 49FB61A2C4 for ; Wed, 30 Sep 2020 16:16:08 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 0FD3845A45 for ; Wed, 30 Sep 2020 16:16:08 +0200 (CEST) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Wed, 30 Sep 2020 16:15:59 +0200 Message-Id: <20200930141601.27233-4-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200930141601.27233-1-s.reiter@proxmox.com> References: <20200930141601.27233-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.047 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 30 Sep 2020 14:16:40 -0000 Enables outside functions to check if an error has occurred without calling send. Also fix a potential bug where errors that happen after the SendHandle has been dropped while doing the thread join might have been ignored. Requires internal check_abort to be moved out of 'impl SendHandle'. Signed-off-by: Stefan Reiter --- src/tools/parallel_handler.rs | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index b15fc046..7b92c23a 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -10,19 +10,19 @@ pub struct SendHandle { abort: Arc>>, } -impl SendHandle { - /// Returns the first error happened, if any - pub fn check_abort(&self) -> Result<(), Error> { - let guard = self.abort.lock().unwrap(); - if let Some(err_msg) = &*guard { - return Err(format_err!("{}", err_msg)); - } - Ok(()) +/// Returns the first error happened, if any +pub fn check_abort(abort: Arc>>) -> Result<(), Error> { + let guard = abort.lock().unwrap(); + if let Some(err_msg) = &*guard { + return Err(format_err!("{}", err_msg)); } + Ok(()) +} +impl SendHandle { /// Send data to the worker threads pub fn send(&self, input: I) -> Result<(), Error> { - self.check_abort()?; + check_abort(Arc::clone(&self.abort))?; match self.input.send(input) { Ok(()) => Ok(()), Err(_) => bail!("send failed - channel closed"), @@ -128,14 +128,23 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { Ok(()) } + /// Return Err if at least one invocation of the callback failed + /// Panics if complete() has been called on this instance + pub fn check_abort(&self) -> Result<(), Error> { + check_abort(Arc::clone(&self.input.as_ref().unwrap().abort)) + } + /// Wait for worker threads to complete and check for errors pub fn complete(mut self) -> Result<(), Error> { - self.input.as_ref().unwrap().check_abort()?; + let abort = Arc::clone(&self.input.as_ref().unwrap().abort); + check_abort(Arc::clone(&abort))?; drop(self.input.take()); let msg_list = self.join_threads(); if msg_list.is_empty() { + // an error might be encountered while waiting for the join + check_abort(abort)?; return Ok(()); } Err(format_err!("{}", msg_list.join("\n"))) -- 2.20.1