From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8B7876273D for ; Wed, 30 Sep 2020 15:25:30 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8969019862 for ; Wed, 30 Sep 2020 15:25:30 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 7DCD41984B for ; Wed, 30 Sep 2020 15:25:29 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 46C5945A4F for ; Wed, 30 Sep 2020 15:25:29 +0200 (CEST) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Wed, 30 Sep 2020 15:25:21 +0200 Message-Id: <20200930132522.22927-5-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200930132522.22927-1-s.reiter@proxmox.com> References: <20200930132522.22927-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.049 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 30 Sep 2020 13:25:30 -0000 We only store one error anyway, so no point in continuing to process data if one thread has already failed. Especially important for unbounded mode, where there's possibly still a lot of data to go through, so complete() doesn't wait for all of that to happen. Also abort on drop, if the caller wants to wait for completion, he has to call complete(). Current logic should be unaffected: * 'verify' never returns an error from handler_fn * 'pull' errors immediately anyway once 'send' or 'complete' fail, so it doesn't matter if that happens a little earlier and some chunks are left unwritten Signed-off-by: Stefan Reiter --- src/tools/parallel_handler.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs index 7b92c23a..102ecce3 100644 --- a/src/tools/parallel_handler.rs +++ b/src/tools/parallel_handler.rs @@ -1,4 +1,5 @@ use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::JoinHandle; use anyhow::{bail, format_err, Error}; @@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender}; pub struct SendHandle { input: Sender, abort: Arc>>, + aborted: Arc, } /// Returns the first error happened, if any @@ -50,6 +52,7 @@ impl Clone for SendHandle { Self { input: self.input.clone(), abort: Arc::clone(&self.abort), + aborted: Arc::clone(&self.aborted), } } } @@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { }; let abort = Arc::new(Mutex::new(None)); + let aborted = Arc::new(AtomicBool::new(false)); for i in 0..threads { let input_rx = input_rx.clone(); let abort = Arc::clone(&abort); + let aborted = Arc::clone(&aborted); // Erase the 'a lifetime bound. This is safe because we // join all thread in the drop handler. @@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { std::thread::Builder::new() .name(format!("{} ({})", name, i)) .spawn(move || loop { + if aborted.load(Ordering::Acquire) { + // some other thread aborted, exit early + return; + } let data = match input_rx.recv() { Ok(data) => data, Err(_) => return, @@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { match (handler_fn)(data) { Ok(()) => (), Err(err) => { + aborted.store(true, Ordering::Release); let mut guard = abort.lock().unwrap(); if guard.is_none() { *guard = Some(err.to_string()); } + return; } } }) @@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { input: Some(SendHandle { input: input_tx, abort, + aborted, }), _marker: std::marker::PhantomData, } @@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { check_abort(Arc::clone(&self.input.as_ref().unwrap().abort)) } - /// Wait for worker threads to complete and check for errors + /// Wait for worker threads to complete and check for errors. + /// Only this ensures completion. Dropping the instance aborts instead. pub fn complete(mut self) -> Result<(), Error> { let abort = Arc::clone(&self.input.as_ref().unwrap().abort); check_abort(Arc::clone(&abort))?; @@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> { // Note: We make sure that all threads will be joined impl<'a, I> Drop for ParallelHandler<'a, I> { fn drop(&mut self) { - drop(self.input.take()); + let input = self.input.take(); + if let Some(input) = &input { + // shut down ASAP + input.aborted.store(true, Ordering::Release); + } + drop(input); while let Some(handle) = self.handles.pop() { let _ = handle.join(); } -- 2.20.1