public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH 0/5] backup validation improvements
@ 2020-09-30 13:25 Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
                   ` (4 more replies)
  0 siblings, 5 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

First patch skips server-side validation (existance check) of last snapshot's
chunks if the base was verified successfully at least 7 days ago. With verify
scheduling this means that users can avoid the overhead during backups by doing
weekly verifications.

The latter 4 patches (marked RFC, not sure if the idea is that sound) make the
validation happen in a seperate thread. This requires some modifications to
ParallelHandler and carries the tradeoff that we have to verify *all* chunks of
the last backup, since when the thread starts we don't know yet which ones the
client will upload and overwrite anyway.


proxmox-backup: Stefan Reiter (5):
  backup: don't validate chunk existance if base was recently verified
  ParallelHandler add unbounded mode
  ParallelHandler: add check_abort function and handle errors during
    join
  ParallelHandler: exit early if this or other thread aborted
  backup: validate chunk existance in background

 src/api2/backup/environment.rs | 172 ++++++++++++++++++++++++---------
 src/backup/verify.rs           |   3 +-
 src/client/pull.rs             |   3 +-
 src/tools/parallel_handler.rs  |  68 ++++++++++---
 4 files changed, 183 insertions(+), 63 deletions(-)

-- 
2.20.1




^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
  2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
  2020-09-30 13:32   ` Thomas Lamprecht
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode Stefan Reiter
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

If the base was successfully verified within the last 7 days, we assume
that it is okay and all chunks exist, so we don't have to check.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)

diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index d515bf30..be06d1dc 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -457,6 +457,31 @@ impl BackupEnvironment {
         Ok(())
     }
 
+    fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
+        match &self.last_backup {
+            Some(last_backup) => {
+                let last_dir = &last_backup.backup_dir;
+                let (manifest, _) = self.datastore.load_manifest(last_dir)?;
+                let verify = manifest.unprotected["verify_state"].clone();
+                match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
+                    Ok(verify) => match verify {
+                        Some(verify) => {
+                            let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
+                            let cutoff = cutoff - 60*60*24*7; // one week back
+                            Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
+                        },
+                        None => Ok(false)
+                    },
+                    Err(err) => {
+                        self.worker.warn(format!("error parsing base verification state : '{}'", err));
+                        Ok(false)
+                    }
+                }
+            },
+            None => Ok(false)
+        }
+    }
+
     /// Ensure all chunks referenced in this backup actually exist.
     /// Only call *after* all writers have been closed, to avoid race with GC.
     /// In case of error, mark the previous backup as 'verify failed'.
@@ -534,7 +559,9 @@ impl BackupEnvironment {
             }
         }
 
-        self.verify_chunk_existance(&state.known_chunks)?;
+        if !self.last_backup_has_recent_verify()? {
+            self.verify_chunk_existance(&state.known_chunks)?;
+        }
 
         // marks the backup as successful
         state.finished = true;
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode
  2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/verify.rs          |  3 ++-
 src/client/pull.rs            |  3 ++-
 src/tools/parallel_handler.rs | 15 ++++++++++++---
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
             }
 
             Ok(())
-        }
+        },
+        true
     );
 
     for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
             chunk.verify_unencrypted(size as usize, &digest)?;
             target.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+       },
+       true
     );
 
     let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
 
 /// A handle to send data to the worker thread (implements clone)
 pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
 impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
     /// Create a new thread pool, each thread processing incoming data
     /// with 'handler_fn'.
-    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    pub fn new<F>(
+        name: &str,
+        threads: usize,
+        handler_fn: F,
+        bounded_mode: bool,
+    ) -> Self
         where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
     {
         let mut handles = Vec::new();
-        let (input_tx, input_rx) = bounded::<I>(threads);
+        let (input_tx, input_rx) = if bounded_mode {
+            bounded::<I>(threads)
+        } else {
+            unbounded::<I>()
+        };
 
         let abort = Arc::new(Mutex::new(None));
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join
  2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
  4 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

Enables outside functions to check if an error has occurred without
calling send. Also fix a potential bug where errors that happen after
the SendHandle has been dropped while doing the thread join might have
been ignored. Requires internal check_abort to be moved out of 'impl
SendHandle'.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/tools/parallel_handler.rs | 29 +++++++++++++++++++----------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index b15fc046..7b92c23a 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -10,19 +10,19 @@ pub struct SendHandle<I> {
     abort: Arc<Mutex<Option<String>>>,
 }
 
-impl<I: Send> SendHandle<I> {
-    /// Returns the first error happened, if any
-    pub fn check_abort(&self) -> Result<(), Error> {
-        let guard = self.abort.lock().unwrap();
-        if let Some(err_msg) = &*guard {
-            return Err(format_err!("{}", err_msg));
-        }
-        Ok(())
+/// Returns the first error happened, if any
+pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
+    let guard = abort.lock().unwrap();
+    if let Some(err_msg) = &*guard {
+        return Err(format_err!("{}", err_msg));
     }
+    Ok(())
+}
 
+impl<I: Send> SendHandle<I> {
     /// Send data to the worker threads
     pub fn send(&self, input: I) -> Result<(), Error> {
-        self.check_abort()?;
+        check_abort(Arc::clone(&self.abort))?;
         match self.input.send(input) {
             Ok(()) => Ok(()),
             Err(_) => bail!("send failed - channel closed"),
@@ -128,14 +128,23 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         Ok(())
     }
 
+    /// Return Err if at least one invocation of the callback failed
+    /// Panics if complete() has been called on this instance
+    pub fn check_abort(&self) -> Result<(), Error> {
+        check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
+    }
+
     /// Wait for worker threads to complete and check for errors
     pub fn complete(mut self) -> Result<(), Error> {
-        self.input.as_ref().unwrap().check_abort()?;
+        let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
+        check_abort(Arc::clone(&abort))?;
         drop(self.input.take());
 
         let msg_list = self.join_threads();
 
         if msg_list.is_empty() {
+            // an error might be encountered while waiting for the join
+            check_abort(abort)?;
             return Ok(());
         }
         Err(format_err!("{}", msg_list.join("\n")))
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted
  2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
                   ` (2 preceding siblings ...)
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
  4 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.

Also abort on drop, if the caller wants to wait for completion, he has
to call complete().

Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
  it doesn't matter if that happens a little earlier and some chunks are
  left unwritten

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
 use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
 pub struct SendHandle<I> {
     input: Sender<I>,
     abort: Arc<Mutex<Option<String>>>,
+    aborted: Arc<AtomicBool>,
 }
 
 /// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
         Self {
             input: self.input.clone(),
             abort: Arc::clone(&self.abort),
+            aborted: Arc::clone(&self.aborted),
         }
     }
 }
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         };
 
         let abort = Arc::new(Mutex::new(None));
+        let aborted = Arc::new(AtomicBool::new(false));
 
         for i in 0..threads {
             let input_rx = input_rx.clone();
             let abort = Arc::clone(&abort);
+            let aborted = Arc::clone(&aborted);
 
             // Erase the 'a lifetime bound. This is safe because we
             // join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                 std::thread::Builder::new()
                     .name(format!("{} ({})", name, i))
                     .spawn(move || loop {
+                        if aborted.load(Ordering::Acquire) {
+                            // some other thread aborted, exit early
+                            return;
+                        }
                         let data = match input_rx.recv() {
                             Ok(data) => data,
                             Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
                         match (handler_fn)(data) {
                             Ok(()) => (),
                             Err(err) => {
+                                aborted.store(true, Ordering::Release);
                                 let mut guard = abort.lock().unwrap();
                                 if guard.is_none() {
                                     *guard = Some(err.to_string());
                                 }
+                                return;
                             }
                         }
                     })
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
             input: Some(SendHandle {
                 input: input_tx,
                 abort,
+                aborted,
             }),
             _marker: std::marker::PhantomData,
         }
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
         check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
     }
 
-    /// Wait for worker threads to complete and check for errors
+    /// Wait for worker threads to complete and check for errors.
+    /// Only this ensures completion. Dropping the instance aborts instead.
     pub fn complete(mut self) -> Result<(), Error> {
         let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
         check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
 // Note: We make sure that all threads will be joined
 impl<'a, I> Drop for ParallelHandler<'a, I> {
     fn drop(&mut self) {
-        drop(self.input.take());
+        let input = self.input.take();
+        if let Some(input) = &input {
+            // shut down ASAP
+            input.aborted.store(true, Ordering::Release);
+        }
+        drop(input);
         while let Some(handle) = self.handles.pop() {
             let _ = handle.join();
         }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background
  2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
                   ` (3 preceding siblings ...)
  2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
  4 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
  To: pbs-devel

Reused chunks will not be uploaded, and thus never touched. We need to
verify their existance manually to ensure a valid backup.

Since we know all chunks that the client may reuse must be recorded in
the previous snapshot (which is locked during backup and can't be
forgotten), we can do the validation in the background, while the backup
is still running, and only join at the end if there's still work left.

The tradeoff here is that we don't know yet which chunks the client will
*not* reuse later in the backup, so we have to check them all.

This also means we can revert the changes to the KnownChunksMap type
made in 43772efc6e.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/api2/backup/environment.rs | 155 ++++++++++++++++++++++-----------
 1 file changed, 105 insertions(+), 50 deletions(-)

diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index be06d1dc..cc41032a 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -11,8 +11,9 @@ use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
 
 use crate::api2::types::{Userid, SnapshotVerifyState, VerifyState};
 use crate::backup::*;
-use crate::server::WorkerTask;
+use crate::server::{WorkerTask, UPID};
 use crate::server::formatter::*;
+use crate::tools::ParallelHandler;
 use hyper::{Body, Response};
 
 #[derive(Copy, Clone, Serialize)]
@@ -66,8 +67,14 @@ struct FixedWriterState {
     incremental: bool,
 }
 
-// key=digest, value=(length, existance checked)
-type KnownChunksMap = HashMap<[u8;32], (u32, bool)>;
+// key=digest, value=length
+type KnownChunksMap = HashMap<[u8;32], u32>;
+
+enum ValidateHandlerState {
+    NotInitialized,
+    NotNeeded,
+    Running(ParallelHandler<'static, [u8;32]>),
+}
 
 struct SharedBackupState {
     finished: bool,
@@ -78,6 +85,7 @@ struct SharedBackupState {
     known_chunks: KnownChunksMap,
     backup_size: u64, // sums up size of all files
     backup_stat: UploadStatistic,
+    validate_handler: ValidateHandlerState,
 }
 
 impl SharedBackupState {
@@ -131,6 +139,7 @@ impl BackupEnvironment {
             known_chunks: HashMap::new(),
             backup_size: 0,
             backup_stat: UploadStatistic::new(),
+            validate_handler: ValidateHandlerState::NotInitialized,
         };
 
         Self {
@@ -156,11 +165,89 @@ impl BackupEnvironment {
 
         state.ensure_unfinished()?;
 
-        state.known_chunks.insert(digest, (length, false));
+        state.known_chunks.insert(digest, length);
+
+        match &state.validate_handler {
+            ValidateHandlerState::NotInitialized => {
+                if self.last_backup_has_recent_verify()? {
+                    state.validate_handler = ValidateHandlerState::NotNeeded;
+                } else {
+                    let handler = self.start_validate_handler();
+                    handler.send(digest)?;
+                    state.validate_handler = ValidateHandlerState::Running(handler);
+                }
+            },
+            ValidateHandlerState::Running(handler) => {
+                handler.send(digest)?;
+            },
+            ValidateHandlerState::NotNeeded => {}
+        }
 
         Ok(())
     }
 
+    fn start_validate_handler(&self) -> ParallelHandler<'static, [u8;32]> {
+        let datastore = Arc::clone(&self.datastore);
+        let upid = Arc::new(self.worker.upid().clone());
+        let last_backup = Arc::new(self.last_backup.clone());
+        ParallelHandler::new(
+            "verify handler",
+            1, // one worker is enough, and means we don't need a lock to mark the prev snapshot
+            move |digest| {
+                Self::validate_chunk_existance(
+                    &digest,
+                    Arc::clone(&datastore),
+                    Arc::clone(&upid),
+                    Arc::clone(&last_backup)
+                )
+            },
+            false // don't block on send
+        )
+    }
+
+    fn validate_chunk_existance(
+        digest: &[u8;32],
+        datastore: Arc<DataStore>,
+        upid: Arc<UPID>,
+        last_backup: Arc<Option<BackupInfo>>,
+    ) -> Result<(), Error> {
+        if !datastore.chunk_path(digest).0.exists() {
+            // Chunk is missing, mark last snapshot (which references it) as "verify failed"
+            let mark_msg = if let Some(ref last_backup) = *last_backup {
+                let last_dir = &last_backup.backup_dir;
+                let verify_state = SnapshotVerifyState {
+                    state: VerifyState::Failed,
+                    upid: UPID::clone(upid.as_ref()),
+                };
+
+                let res = proxmox::try_block!{
+                    let (mut manifest, _) = datastore.load_manifest(last_dir)?;
+                    manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
+                    datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
+                };
+
+                if let Err(err) = res {
+                    format!("tried marking previous snapshot as bad, \
+                            but got error accessing manifest: {}", err)
+                } else {
+                    "marked previous snapshot as bad, please use \
+                    'verify' for a detailed check".to_owned()
+                }
+            } else {
+                "internal error: no base backup registered to mark invalid".to_owned()
+            };
+
+            bail!(
+                "chunk '{}' was attempted to be reused but doesn't exist - {}",
+                digest_to_hex(digest),
+                mark_msg
+            );
+        }
+
+        Ok(())
+    }
+
+
     /// Register fixed length chunks after upload.
     ///
     /// Like `register_chunk()`, but additionally record statistics for
@@ -176,6 +263,9 @@ impl BackupEnvironment {
         let mut state = self.state.lock().unwrap();
 
         state.ensure_unfinished()?;
+        if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+            handler.check_abort()?;
+        }
 
         let mut data = match state.fixed_writers.get_mut(&wid) {
             Some(data) => data,
@@ -198,7 +288,7 @@ impl BackupEnvironment {
         if is_duplicate { data.upload_stat.duplicates += 1; }
 
         // register chunk
-        state.known_chunks.insert(digest, (size, true));
+        state.known_chunks.insert(digest, size);
 
         Ok(())
     }
@@ -218,6 +308,9 @@ impl BackupEnvironment {
         let mut state = self.state.lock().unwrap();
 
         state.ensure_unfinished()?;
+        if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+            handler.check_abort()?;
+        }
 
         let mut data = match state.dynamic_writers.get_mut(&wid) {
             Some(data) => data,
@@ -231,7 +324,7 @@ impl BackupEnvironment {
         if is_duplicate { data.upload_stat.duplicates += 1; }
 
         // register chunk
-        state.known_chunks.insert(digest, (size, true));
+        state.known_chunks.insert(digest, size);
 
         Ok(())
     }
@@ -240,7 +333,7 @@ impl BackupEnvironment {
         let state = self.state.lock().unwrap();
 
         match state.known_chunks.get(digest) {
-            Some((len, _)) => Some(*len),
+            Some(len) => Some(*len),
             None => None,
         }
     }
@@ -482,47 +575,6 @@ impl BackupEnvironment {
         }
     }
 
-    /// Ensure all chunks referenced in this backup actually exist.
-    /// Only call *after* all writers have been closed, to avoid race with GC.
-    /// In case of error, mark the previous backup as 'verify failed'.
-    fn verify_chunk_existance(&self, known_chunks: &KnownChunksMap) -> Result<(), Error> {
-        for (digest, (_, checked)) in known_chunks.iter() {
-            if !checked && !self.datastore.chunk_path(digest).0.exists() {
-                let mark_msg = if let Some(ref last_backup) = self.last_backup {
-                    let last_dir = &last_backup.backup_dir;
-                    let verify_state = SnapshotVerifyState {
-                        state: VerifyState::Failed,
-                        upid: self.worker.upid().clone(),
-                    };
-
-                    let res = proxmox::try_block!{
-                        let (mut manifest, _) = self.datastore.load_manifest(last_dir)?;
-                        manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
-                        self.datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
-                    };
-
-                    if let Err(err) = res {
-                        format!("tried marking previous snapshot as bad, \
-                                but got error accessing manifest: {}", err)
-                    } else {
-                        "marked previous snapshot as bad, please use \
-                        'verify' for a detailed check".to_owned()
-                    }
-                } else {
-                    "internal error: no base backup registered to mark invalid".to_owned()
-                };
-
-                bail!(
-                    "chunk '{}' was attempted to be reused but doesn't exist - {}",
-                    digest_to_hex(digest),
-                    mark_msg
-                );
-            }
-        }
-
-        Ok(())
-    }
-
     /// Mark backup as finished
     pub fn finish_backup(&self) -> Result<(), Error> {
         let mut state = self.state.lock().unwrap();
@@ -559,8 +611,11 @@ impl BackupEnvironment {
             }
         }
 
-        if !self.last_backup_has_recent_verify()? {
-            self.verify_chunk_existance(&state.known_chunks)?;
+        // stop verify handler and verify remaining chunks
+        let handler = std::mem::replace(&mut state.validate_handler, ValidateHandlerState::NotInitialized);
+        if let ValidateHandlerState::Running(handler) = handler {
+            self.worker.log("waiting for validate thread to complete");
+            handler.complete()?;
         }
 
         // marks the backup as successful
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
  2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 13:32   ` Thomas Lamprecht
  2020-09-30 13:42     ` Stefan Reiter
  0 siblings, 1 reply; 10+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 13:32 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 30.09.20 15:25, Stefan Reiter wrote:
> If the base was successfully verified within the last 7 days, we assume
> that it is okay and all chunks exist, so we don't have to check.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
>  1 file changed, 28 insertions(+), 1 deletion(-)
> 
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index d515bf30..be06d1dc 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -457,6 +457,31 @@ impl BackupEnvironment {
>          Ok(())
>      }
>  
> +    fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
> +        match &self.last_backup {
> +            Some(last_backup) => {
> +                let last_dir = &last_backup.backup_dir;
> +                let (manifest, _) = self.datastore.load_manifest(last_dir)?;
> +                let verify = manifest.unprotected["verify_state"].clone();
> +                match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
> +                    Ok(verify) => match verify {
> +                        Some(verify) => {
> +                            let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
> +                            let cutoff = cutoff - 60*60*24*7; // one week back

Why unsafe and why not our `proxmox::tools::time::epoch_i64()` ?


on another note: we should probably add some helper for getting the
verify state

> +                            Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
> +                        },
> +                        None => Ok(false)
> +                    },
> +                    Err(err) => {
> +                        self.worker.warn(format!("error parsing base verification state : '{}'", err));
> +                        Ok(false)
> +                    }
> +                }
> +            },
> +            None => Ok(false)
> +        }
> +    }
> +
>      /// Ensure all chunks referenced in this backup actually exist.
>      /// Only call *after* all writers have been closed, to avoid race with GC.
>      /// In case of error, mark the previous backup as 'verify failed'.
> @@ -534,7 +559,9 @@ impl BackupEnvironment {
>              }
>          }
>  
> -        self.verify_chunk_existance(&state.known_chunks)?;
> +        if !self.last_backup_has_recent_verify()? {
> +            self.verify_chunk_existance(&state.known_chunks)?;
> +        }
>  
>          // marks the backup as successful
>          state.finished = true;
> 






^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
  2020-09-30 13:32   ` Thomas Lamprecht
@ 2020-09-30 13:42     ` Stefan Reiter
  2020-09-30 13:55       ` Thomas Lamprecht
  0 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:42 UTC (permalink / raw)
  To: Thomas Lamprecht, Proxmox Backup Server development discussion

On 9/30/20 3:32 PM, Thomas Lamprecht wrote:
> On 30.09.20 15:25, Stefan Reiter wrote:
>> If the base was successfully verified within the last 7 days, we assume
>> that it is okay and all chunks exist, so we don't have to check.
>>
>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>> ---
>>   src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
>>   1 file changed, 28 insertions(+), 1 deletion(-)
>>
>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
>> index d515bf30..be06d1dc 100644
>> --- a/src/api2/backup/environment.rs
>> +++ b/src/api2/backup/environment.rs
>> @@ -457,6 +457,31 @@ impl BackupEnvironment {
>>           Ok(())
>>       }
>>   
>> +    fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
>> +        match &self.last_backup {
>> +            Some(last_backup) => {
>> +                let last_dir = &last_backup.backup_dir;
>> +                let (manifest, _) = self.datastore.load_manifest(last_dir)?;
>> +                let verify = manifest.unprotected["verify_state"].clone();
>> +                match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
>> +                    Ok(verify) => match verify {
>> +                        Some(verify) => {
>> +                            let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
>> +                            let cutoff = cutoff - 60*60*24*7; // one week back
> 
> Why unsafe and why not our `proxmox::tools::time::epoch_i64()` ?
> 

Didn't know that exists, I've seen that unsafe a few times so I thought 
that was the usual way. I'll send v2.

> 
> on another note: we should probably add some helper for getting the
> verify state
> 

I'll see if it's feasible. So far we only have two sites that both do 
different things on individual errors, so I'm not sure how much that 
would simplify things.

>> +                            Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
>> +                        },
>> +                        None => Ok(false)
>> +                    },
>> +                    Err(err) => {
>> +                        self.worker.warn(format!("error parsing base verification state : '{}'", err));
>> +                        Ok(false)
>> +                    }
>> +                }
>> +            },
>> +            None => Ok(false)
>> +        }
>> +    }
>> +
>>       /// Ensure all chunks referenced in this backup actually exist.
>>       /// Only call *after* all writers have been closed, to avoid race with GC.
>>       /// In case of error, mark the previous backup as 'verify failed'.
>> @@ -534,7 +559,9 @@ impl BackupEnvironment {
>>               }
>>           }
>>   
>> -        self.verify_chunk_existance(&state.known_chunks)?;
>> +        if !self.last_backup_has_recent_verify()? {
>> +            self.verify_chunk_existance(&state.known_chunks)?;
>> +        }
>>   
>>           // marks the backup as successful
>>           state.finished = true;
>>
> 
> 




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
  2020-09-30 13:42     ` Stefan Reiter
@ 2020-09-30 13:55       ` Thomas Lamprecht
  0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 13:55 UTC (permalink / raw)
  To: Stefan Reiter, Proxmox Backup Server development discussion

On 30.09.20 15:42, Stefan Reiter wrote:
> On 9/30/20 3:32 PM, Thomas Lamprecht wrote:
>> on another note: we should probably add some helper for getting the
>> verify state
>>
> 
> I'll see if it's feasible. So far we only have two sites that both do different things on individual errors, so I'm not sure how much that would simplify things.
> 

Hasn't to be now, just had a feeling I saw (and/or wrote) this already
a few times.

More important may be to make the unprotected part of the manifest safer
for parallel read/write access (RW lock?).




^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode
  2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
@ 2020-09-30 14:15 ` Stefan Reiter
  0 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:15 UTC (permalink / raw)
  To: pbs-devel

Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/verify.rs          |  3 ++-
 src/client/pull.rs            |  3 ++-
 src/tools/parallel_handler.rs | 15 ++++++++++++---
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
             }
 
             Ok(())
-        }
+        },
+        true
     );
 
     for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
             chunk.verify_unencrypted(size as usize, &digest)?;
             target.insert_chunk(&chunk, &digest)?;
             Ok(())
-       }
+       },
+       true
     );
 
     let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
 use std::thread::JoinHandle;
 
 use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
 
 /// A handle to send data to the worker thread (implements clone)
 pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
 impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
     /// Create a new thread pool, each thread processing incoming data
     /// with 'handler_fn'.
-    pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+    pub fn new<F>(
+        name: &str,
+        threads: usize,
+        handler_fn: F,
+        bounded_mode: bool,
+    ) -> Self
         where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
     {
         let mut handles = Vec::new();
-        let (input_tx, input_rx) = bounded::<I>(threads);
+        let (input_tx, input_rx) = if bounded_mode {
+            bounded::<I>(threads)
+        } else {
+            unbounded::<I>()
+        };
 
         let abort = Arc::new(Mutex::new(None));
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-09-30 14:16 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
2020-09-30 13:32   ` Thomas Lamprecht
2020-09-30 13:42     ` Stefan Reiter
2020-09-30 13:55       ` Thomas Lamprecht
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode Stefan Reiter

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal