public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted
Date: Wed, 30 Sep 2020 16:16:00 +0200	[thread overview]
Message-ID: <20200930141601.27233-5-s.reiter@proxmox.com> (raw)
In-Reply-To: <20200930141601.27233-1-s.reiter@proxmox.com>

We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.

Also abort on drop, if the caller wants to wait for completion, he has
to call complete().

Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
  it doesn't matter if that happens a little earlier and some chunks are
  left unwritten

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
 pub struct SendHandle<I> {
     input: Sender<I>,
     abort: Arc<Mutex<Option<String>>>,
+    aborted: Arc<AtomicBool>,
 }
 
 /// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
         Self {
             input: self.input.clone(),
             abort: Arc::clone(&self.abort),
+            aborted: Arc::clone(&self.aborted),
         }
     }
 }
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         };
 
         let abort = Arc::new(Mutex::new(None));
+        let aborted = Arc::new(AtomicBool::new(false));
 
         for i in 0..threads {
             let input_rx = input_rx.clone();
             let abort = Arc::clone(&abort);
+            let aborted = Arc::clone(&aborted);
 
             // Erase the 'a lifetime bound. This is safe because we
             // join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                 std::thread::Builder::new()
                     .name(format!("{} ({})", name, i))
                     .spawn(move || loop {
+                        if aborted.load(Ordering::Acquire) {
+                            // some other thread aborted, exit early
+                            return;
+                        }
                         let data = match input_rx.recv() {
                             Ok(data) => data,
                             Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                         match (handler_fn)(data) {
                             Ok(()) => (),
                             Err(err) => {
+                                aborted.store(true, Ordering::Release);
                                 let mut guard = abort.lock().unwrap();
                                 if guard.is_none() {
                                     *guard = Some(err.to_string());
                                 }
+                                return;
                             }
                         }
                     })
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
             input: Some(SendHandle {
                 input: input_tx,
                 abort,
+                aborted,
             }),
             _marker: std::marker::PhantomData,
         }
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
     }
 
-    /// Wait for worker threads to complete and check for errors
+    /// Wait for worker threads to complete and check for errors.
+    /// Only this ensures completion. Dropping the instance aborts instead.
     pub fn complete(mut self) -> Result<(), Error> {
         let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
         check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
 // Note: We make sure that all threads will be joined
 impl<'a, I> Drop for ParallelHandler<'a, I> {
     fn drop(&mut self) {
-        drop(self.input.take());
+        let input = self.input.take();
+        if let Some(input) = &input {
+            // shut down ASAP
+            input.aborted.store(true, Ordering::Release);
+        }
+        drop(input);
         while let Some(handle) = self.handles.pop() {
             let _ = handle.join();
         }
-- 
2.20.1





  parent reply	other threads:[~2020-09-30 14:16 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
2020-09-30 14:35   ` Thomas Lamprecht
2020-09-30 14:56   ` Dietmar Maurer
2020-09-30 15:04     ` Thomas Lamprecht
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
2020-09-30 14:16 ` Stefan Reiter [this message]
2020-09-30 14:16 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
  -- strict thread matches above, loose matches on Subject: below --
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200930141601.27233-5-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal