* [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
@ 2020-09-30 14:15 ` Stefan Reiter
2020-09-30 14:35 ` Thomas Lamprecht
2020-09-30 14:56 ` Dietmar Maurer
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode Stefan Reiter
` (3 subsequent siblings)
4 siblings, 2 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:15 UTC (permalink / raw)
To: pbs-devel
If the base was successfully verified within the last 7 days, we assume
that it is okay and all chunks exist, so we don't have to check.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2:
* use proxmox::tools::time
src/api2/backup/environment.rs | 32 ++++++++++++++++++++++++++++++--
1 file changed, 30 insertions(+), 2 deletions(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index d515bf30..a8c9ddb4 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -5,7 +5,7 @@ use std::collections::HashMap;
use ::serde::{Serialize};
use serde_json::{json, Value};
-use proxmox::tools::digest_to_hex;
+use proxmox::tools::{digest_to_hex, time};
use proxmox::tools::fs::{replace_file, CreateOptions};
use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
@@ -457,6 +457,32 @@ impl BackupEnvironment {
Ok(())
}
+ fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
+ match &self.last_backup {
+ Some(last_backup) => {
+ let last_dir = &last_backup.backup_dir;
+ let (manifest, _) = self.datastore.load_manifest(last_dir)?;
+ let verify = manifest.unprotected["verify_state"].clone();
+ match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
+ Ok(verify) => match verify {
+ Some(verify) => {
+ let mut cutoff = time::TmEditor::with_epoch(time::epoch_i64(), false)?;
+ cutoff.add_days(-7)?;
+ let cutoff = cutoff.into_epoch()?;
+ Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
+ },
+ None => Ok(false)
+ },
+ Err(err) => {
+ self.worker.warn(format!("error parsing base verification state : '{}'", err));
+ Ok(false)
+ }
+ }
+ },
+ None => Ok(false)
+ }
+ }
+
/// Ensure all chunks referenced in this backup actually exist.
/// Only call *after* all writers have been closed, to avoid race with GC.
/// In case of error, mark the previous backup as 'verify failed'.
@@ -534,7 +560,9 @@ impl BackupEnvironment {
}
}
- self.verify_chunk_existance(&state.known_chunks)?;
+ if !self.last_backup_has_recent_verify()? {
+ self.verify_chunk_existance(&state.known_chunks)?;
+ }
// marks the backup as successful
state.finished = true;
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 14:15 ` [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 14:35 ` Thomas Lamprecht
2020-09-30 14:56 ` Dietmar Maurer
1 sibling, 0 replies; 9+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 14:35 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
On 30.09.20 16:15, Stefan Reiter wrote:
> If the base was successfully verified within the last 7 days, we assume
> that it is okay and all chunks exist, so we don't have to check.
>
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>
> v2:
> * use proxmox::tools::time
>
> src/api2/backup/environment.rs | 32 ++++++++++++++++++++++++++++++--
> 1 file changed, 30 insertions(+), 2 deletions(-)
>
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index d515bf30..a8c9ddb4 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -5,7 +5,7 @@ use std::collections::HashMap;
> use ::serde::{Serialize};
> use serde_json::{json, Value};
>
> -use proxmox::tools::digest_to_hex;
> +use proxmox::tools::{digest_to_hex, time};
> use proxmox::tools::fs::{replace_file, CreateOptions};
> use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
>
> @@ -457,6 +457,32 @@ impl BackupEnvironment {
> Ok(())
> }
>
> + fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
> + match &self.last_backup {
> + Some(last_backup) => {
> + let last_dir = &last_backup.backup_dir;
> + let (manifest, _) = self.datastore.load_manifest(last_dir)?;
> + let verify = manifest.unprotected["verify_state"].clone();
> + match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
> + Ok(verify) => match verify {
> + Some(verify) => {
> + let mut cutoff = time::TmEditor::with_epoch(time::epoch_i64(), false)?;
> + cutoff.add_days(-7)?;
> + let cutoff = cutoff.into_epoch()?;
why not just use time::epoch_i64(), avoiding a conversion to tm just
to convert it back, this is no time critical check, if there's a leap
second or something like that.
IMO, below is just more simple and everyone still gets whats meant..
let cutoff = time::epoch_i64() - 7 * 24 * 60 * 60;
Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
(but please, if you agree, do not send a v3 with the full series, this can
stand as it's own patch - out of the series)
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 14:15 ` [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
2020-09-30 14:35 ` Thomas Lamprecht
@ 2020-09-30 14:56 ` Dietmar Maurer
2020-09-30 15:04 ` Thomas Lamprecht
1 sibling, 1 reply; 9+ messages in thread
From: Dietmar Maurer @ 2020-09-30 14:56 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
A simpler approach would we to disable chunk reuse if not verified
within the last 7 days (at start of the backup).
> On 09/30/2020 4:15 PM Stefan Reiter <s.reiter@proxmox.com> wrote:
> If the base was successfully verified within the last 7 days, we assume
> that it is okay and all chunks exist, so we don't have to check.
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 14:56 ` Dietmar Maurer
@ 2020-09-30 15:04 ` Thomas Lamprecht
0 siblings, 0 replies; 9+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 15:04 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dietmar Maurer,
Stefan Reiter
On 30.09.20 16:56, Dietmar Maurer wrote:
> A simpler approach would we to disable chunk reuse if not verified
> within the last 7 days (at start of the backup).
>
Does not works with dirty-bitmap backups, and defeats the purpose of incremental
backups in general.
This is a simple loop over a list with a path .exists, do not really see how
that is simpler..
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 14:15 ` Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
` (2 subsequent siblings)
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:15 UTC (permalink / raw)
To: pbs-devel
Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup/verify.rs | 3 ++-
src/client/pull.rs | 3 ++-
src/tools/parallel_handler.rs | 15 ++++++++++++---
3 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
}
Ok(())
- }
+ },
+ true
);
for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
chunk.verify_unencrypted(size as usize, &digest)?;
target.insert_chunk(&chunk, &digest)?;
Ok(())
- }
+ },
+ true
);
let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
/// A handle to send data to the worker thread (implements clone)
pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
/// Create a new thread pool, each thread processing incoming data
/// with 'handler_fn'.
- pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ pub fn new<F>(
+ name: &str,
+ threads: usize,
+ handler_fn: F,
+ bounded_mode: bool,
+ ) -> Self
where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
{
let mut handles = Vec::new();
- let (input_tx, input_rx) = bounded::<I>(threads);
+ let (input_tx, input_rx) = if bounded_mode {
+ bounded::<I>(threads)
+ } else {
+ unbounded::<I>()
+ };
let abort = Arc::new(Mutex::new(None));
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [PATCH v2 proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler add unbounded mode Stefan Reiter
@ 2020-09-30 14:15 ` Stefan Reiter
2020-09-30 14:16 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
2020-09-30 14:16 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:15 UTC (permalink / raw)
To: pbs-devel
Enables outside functions to check if an error has occurred without
calling send. Also fix a potential bug where errors that happen after
the SendHandle has been dropped while doing the thread join might have
been ignored. Requires internal check_abort to be moved out of 'impl
SendHandle'.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools/parallel_handler.rs | 29 +++++++++++++++++++----------
1 file changed, 19 insertions(+), 10 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index b15fc046..7b92c23a 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -10,19 +10,19 @@ pub struct SendHandle<I> {
abort: Arc<Mutex<Option<String>>>,
}
-impl<I: Send> SendHandle<I> {
- /// Returns the first error happened, if any
- pub fn check_abort(&self) -> Result<(), Error> {
- let guard = self.abort.lock().unwrap();
- if let Some(err_msg) = &*guard {
- return Err(format_err!("{}", err_msg));
- }
- Ok(())
+/// Returns the first error happened, if any
+pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
}
+ Ok(())
+}
+impl<I: Send> SendHandle<I> {
/// Send data to the worker threads
pub fn send(&self, input: I) -> Result<(), Error> {
- self.check_abort()?;
+ check_abort(Arc::clone(&self.abort))?;
match self.input.send(input) {
Ok(()) => Ok(()),
Err(_) => bail!("send failed - channel closed"),
@@ -128,14 +128,23 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
Ok(())
}
+ /// Return Err if at least one invocation of the callback failed
+ /// Panics if complete() has been called on this instance
+ pub fn check_abort(&self) -> Result<(), Error> {
+ check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
+ }
+
/// Wait for worker threads to complete and check for errors
pub fn complete(mut self) -> Result<(), Error> {
- self.input.as_ref().unwrap().check_abort()?;
+ let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
+ check_abort(Arc::clone(&abort))?;
drop(self.input.take());
let msg_list = self.join_threads();
if msg_list.is_empty() {
+ // an error might be encountered while waiting for the join
+ check_abort(abort)?;
return Ok(());
}
Err(format_err!("{}", msg_list.join("\n")))
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
` (2 preceding siblings ...)
2020-09-30 14:15 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
@ 2020-09-30 14:16 ` Stefan Reiter
2020-09-30 14:16 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:16 UTC (permalink / raw)
To: pbs-devel
We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.
Also abort on drop, if the caller wants to wait for completion, he has
to call complete().
Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
it doesn't matter if that happens a little earlier and some chunks are
left unwritten
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
pub struct SendHandle<I> {
input: Sender<I>,
abort: Arc<Mutex<Option<String>>>,
+ aborted: Arc<AtomicBool>,
}
/// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
Self {
input: self.input.clone(),
abort: Arc::clone(&self.abort),
+ aborted: Arc::clone(&self.aborted),
}
}
}
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
};
let abort = Arc::new(Mutex::new(None));
+ let aborted = Arc::new(AtomicBool::new(false));
for i in 0..threads {
let input_rx = input_rx.clone();
let abort = Arc::clone(&abort);
+ let aborted = Arc::clone(&aborted);
// Erase the 'a lifetime bound. This is safe because we
// join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
std::thread::Builder::new()
.name(format!("{} ({})", name, i))
.spawn(move || loop {
+ if aborted.load(Ordering::Acquire) {
+ // some other thread aborted, exit early
+ return;
+ }
let data = match input_rx.recv() {
Ok(data) => data,
Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
match (handler_fn)(data) {
Ok(()) => (),
Err(err) => {
+ aborted.store(true, Ordering::Release);
let mut guard = abort.lock().unwrap();
if guard.is_none() {
*guard = Some(err.to_string());
}
+ return;
}
}
})
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
input: Some(SendHandle {
input: input_tx,
abort,
+ aborted,
}),
_marker: std::marker::PhantomData,
}
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
}
- /// Wait for worker threads to complete and check for errors
+ /// Wait for worker threads to complete and check for errors.
+ /// Only this ensures completion. Dropping the instance aborts instead.
pub fn complete(mut self) -> Result<(), Error> {
let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
// Note: We make sure that all threads will be joined
impl<'a, I> Drop for ParallelHandler<'a, I> {
fn drop(&mut self) {
- drop(self.input.take());
+ let input = self.input.take();
+ if let Some(input) = &input {
+ // shut down ASAP
+ input.aborted.store(true, Ordering::Release);
+ }
+ drop(input);
while let Some(handle) = self.handles.pop() {
let _ = handle.join();
}
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background
2020-09-30 14:15 [pbs-devel] [PATCH v2 0/5] backup validation improvements Stefan Reiter
` (3 preceding siblings ...)
2020-09-30 14:16 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
@ 2020-09-30 14:16 ` Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 14:16 UTC (permalink / raw)
To: pbs-devel
Reused chunks will not be uploaded, and thus never touched. We need to
verify their existance manually to ensure a valid backup.
Since we know all chunks that the client may reuse must be recorded in
the previous snapshot (which is locked during backup and can't be
forgotten), we can do the validation in the background, while the backup
is still running, and only join at the end if there's still work left.
The tradeoff here is that we don't know yet which chunks the client will
*not* reuse later in the backup, so we have to check them all.
This also means we can revert the changes to the KnownChunksMap type
made in 43772efc6e.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/api2/backup/environment.rs | 155 ++++++++++++++++++++++-----------
1 file changed, 105 insertions(+), 50 deletions(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index a8c9ddb4..08ecc290 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -11,8 +11,9 @@ use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
use crate::api2::types::{Userid, SnapshotVerifyState, VerifyState};
use crate::backup::*;
-use crate::server::WorkerTask;
+use crate::server::{WorkerTask, UPID};
use crate::server::formatter::*;
+use crate::tools::ParallelHandler;
use hyper::{Body, Response};
#[derive(Copy, Clone, Serialize)]
@@ -66,8 +67,14 @@ struct FixedWriterState {
incremental: bool,
}
-// key=digest, value=(length, existance checked)
-type KnownChunksMap = HashMap<[u8;32], (u32, bool)>;
+// key=digest, value=length
+type KnownChunksMap = HashMap<[u8;32], u32>;
+
+enum ValidateHandlerState {
+ NotInitialized,
+ NotNeeded,
+ Running(ParallelHandler<'static, [u8;32]>),
+}
struct SharedBackupState {
finished: bool,
@@ -78,6 +85,7 @@ struct SharedBackupState {
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
+ validate_handler: ValidateHandlerState,
}
impl SharedBackupState {
@@ -131,6 +139,7 @@ impl BackupEnvironment {
known_chunks: HashMap::new(),
backup_size: 0,
backup_stat: UploadStatistic::new(),
+ validate_handler: ValidateHandlerState::NotInitialized,
};
Self {
@@ -156,11 +165,89 @@ impl BackupEnvironment {
state.ensure_unfinished()?;
- state.known_chunks.insert(digest, (length, false));
+ state.known_chunks.insert(digest, length);
+
+ match &state.validate_handler {
+ ValidateHandlerState::NotInitialized => {
+ if self.last_backup_has_recent_verify()? {
+ state.validate_handler = ValidateHandlerState::NotNeeded;
+ } else {
+ let handler = self.start_validate_handler();
+ handler.send(digest)?;
+ state.validate_handler = ValidateHandlerState::Running(handler);
+ }
+ },
+ ValidateHandlerState::Running(handler) => {
+ handler.send(digest)?;
+ },
+ ValidateHandlerState::NotNeeded => {}
+ }
Ok(())
}
+ fn start_validate_handler(&self) -> ParallelHandler<'static, [u8;32]> {
+ let datastore = Arc::clone(&self.datastore);
+ let upid = Arc::new(self.worker.upid().clone());
+ let last_backup = Arc::new(self.last_backup.clone());
+ ParallelHandler::new(
+ "verify handler",
+ 1, // one worker is enough, and means we don't need a lock to mark the prev snapshot
+ move |digest| {
+ Self::validate_chunk_existance(
+ &digest,
+ Arc::clone(&datastore),
+ Arc::clone(&upid),
+ Arc::clone(&last_backup)
+ )
+ },
+ false // don't block on send
+ )
+ }
+
+ fn validate_chunk_existance(
+ digest: &[u8;32],
+ datastore: Arc<DataStore>,
+ upid: Arc<UPID>,
+ last_backup: Arc<Option<BackupInfo>>,
+ ) -> Result<(), Error> {
+ if !datastore.chunk_path(digest).0.exists() {
+ // Chunk is missing, mark last snapshot (which references it) as "verify failed"
+ let mark_msg = if let Some(ref last_backup) = *last_backup {
+ let last_dir = &last_backup.backup_dir;
+ let verify_state = SnapshotVerifyState {
+ state: VerifyState::Failed,
+ upid: UPID::clone(upid.as_ref()),
+ };
+
+ let res = proxmox::try_block!{
+ let (mut manifest, _) = datastore.load_manifest(last_dir)?;
+ manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
+ datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
+ };
+
+ if let Err(err) = res {
+ format!("tried marking previous snapshot as bad, \
+ but got error accessing manifest: {}", err)
+ } else {
+ "marked previous snapshot as bad, please use \
+ 'verify' for a detailed check".to_owned()
+ }
+ } else {
+ "internal error: no base backup registered to mark invalid".to_owned()
+ };
+
+ bail!(
+ "chunk '{}' was attempted to be reused but doesn't exist - {}",
+ digest_to_hex(digest),
+ mark_msg
+ );
+ }
+
+ Ok(())
+ }
+
+
/// Register fixed length chunks after upload.
///
/// Like `register_chunk()`, but additionally record statistics for
@@ -176,6 +263,9 @@ impl BackupEnvironment {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
+ if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+ handler.check_abort()?;
+ }
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
@@ -198,7 +288,7 @@ impl BackupEnvironment {
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
- state.known_chunks.insert(digest, (size, true));
+ state.known_chunks.insert(digest, size);
Ok(())
}
@@ -218,6 +308,9 @@ impl BackupEnvironment {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
+ if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+ handler.check_abort()?;
+ }
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
@@ -231,7 +324,7 @@ impl BackupEnvironment {
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
- state.known_chunks.insert(digest, (size, true));
+ state.known_chunks.insert(digest, size);
Ok(())
}
@@ -240,7 +333,7 @@ impl BackupEnvironment {
let state = self.state.lock().unwrap();
match state.known_chunks.get(digest) {
- Some((len, _)) => Some(*len),
+ Some(len) => Some(*len),
None => None,
}
}
@@ -483,47 +576,6 @@ impl BackupEnvironment {
}
}
- /// Ensure all chunks referenced in this backup actually exist.
- /// Only call *after* all writers have been closed, to avoid race with GC.
- /// In case of error, mark the previous backup as 'verify failed'.
- fn verify_chunk_existance(&self, known_chunks: &KnownChunksMap) -> Result<(), Error> {
- for (digest, (_, checked)) in known_chunks.iter() {
- if !checked && !self.datastore.chunk_path(digest).0.exists() {
- let mark_msg = if let Some(ref last_backup) = self.last_backup {
- let last_dir = &last_backup.backup_dir;
- let verify_state = SnapshotVerifyState {
- state: VerifyState::Failed,
- upid: self.worker.upid().clone(),
- };
-
- let res = proxmox::try_block!{
- let (mut manifest, _) = self.datastore.load_manifest(last_dir)?;
- manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
- self.datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
- };
-
- if let Err(err) = res {
- format!("tried marking previous snapshot as bad, \
- but got error accessing manifest: {}", err)
- } else {
- "marked previous snapshot as bad, please use \
- 'verify' for a detailed check".to_owned()
- }
- } else {
- "internal error: no base backup registered to mark invalid".to_owned()
- };
-
- bail!(
- "chunk '{}' was attempted to be reused but doesn't exist - {}",
- digest_to_hex(digest),
- mark_msg
- );
- }
- }
-
- Ok(())
- }
-
/// Mark backup as finished
pub fn finish_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@@ -560,8 +612,11 @@ impl BackupEnvironment {
}
}
- if !self.last_backup_has_recent_verify()? {
- self.verify_chunk_existance(&state.known_chunks)?;
+ // stop verify handler and verify remaining chunks
+ let handler = std::mem::replace(&mut state.validate_handler, ValidateHandlerState::NotInitialized);
+ if let ValidateHandlerState::Running(handler) = handler {
+ self.worker.log("waiting for validate thread to complete");
+ handler.complete()?;
}
// marks the backup as successful
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread