* [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
2020-09-30 13:32 ` Thomas Lamprecht
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode Stefan Reiter
` (3 subsequent siblings)
4 siblings, 1 reply; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
To: pbs-devel
If the base was successfully verified within the last 7 days, we assume
that it is okay and all chunks exist, so we don't have to check.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
1 file changed, 28 insertions(+), 1 deletion(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index d515bf30..be06d1dc 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -457,6 +457,31 @@ impl BackupEnvironment {
Ok(())
}
+ fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
+ match &self.last_backup {
+ Some(last_backup) => {
+ let last_dir = &last_backup.backup_dir;
+ let (manifest, _) = self.datastore.load_manifest(last_dir)?;
+ let verify = manifest.unprotected["verify_state"].clone();
+ match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
+ Ok(verify) => match verify {
+ Some(verify) => {
+ let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
+ let cutoff = cutoff - 60*60*24*7; // one week back
+ Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
+ },
+ None => Ok(false)
+ },
+ Err(err) => {
+ self.worker.warn(format!("error parsing base verification state : '{}'", err));
+ Ok(false)
+ }
+ }
+ },
+ None => Ok(false)
+ }
+ }
+
/// Ensure all chunks referenced in this backup actually exist.
/// Only call *after* all writers have been closed, to avoid race with GC.
/// In case of error, mark the previous backup as 'verify failed'.
@@ -534,7 +559,9 @@ impl BackupEnvironment {
}
}
- self.verify_chunk_existance(&state.known_chunks)?;
+ if !self.last_backup_has_recent_verify()? {
+ self.verify_chunk_existance(&state.known_chunks)?;
+ }
// marks the backup as successful
state.finished = true;
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 13:32 ` Thomas Lamprecht
2020-09-30 13:42 ` Stefan Reiter
0 siblings, 1 reply; 9+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 13:32 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
On 30.09.20 15:25, Stefan Reiter wrote:
> If the base was successfully verified within the last 7 days, we assume
> that it is okay and all chunks exist, so we don't have to check.
>
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
> src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
> 1 file changed, 28 insertions(+), 1 deletion(-)
>
> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
> index d515bf30..be06d1dc 100644
> --- a/src/api2/backup/environment.rs
> +++ b/src/api2/backup/environment.rs
> @@ -457,6 +457,31 @@ impl BackupEnvironment {
> Ok(())
> }
>
> + fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
> + match &self.last_backup {
> + Some(last_backup) => {
> + let last_dir = &last_backup.backup_dir;
> + let (manifest, _) = self.datastore.load_manifest(last_dir)?;
> + let verify = manifest.unprotected["verify_state"].clone();
> + match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
> + Ok(verify) => match verify {
> + Some(verify) => {
> + let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
> + let cutoff = cutoff - 60*60*24*7; // one week back
Why unsafe and why not our `proxmox::tools::time::epoch_i64()` ?
on another note: we should probably add some helper for getting the
verify state
> + Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
> + },
> + None => Ok(false)
> + },
> + Err(err) => {
> + self.worker.warn(format!("error parsing base verification state : '{}'", err));
> + Ok(false)
> + }
> + }
> + },
> + None => Ok(false)
> + }
> + }
> +
> /// Ensure all chunks referenced in this backup actually exist.
> /// Only call *after* all writers have been closed, to avoid race with GC.
> /// In case of error, mark the previous backup as 'verify failed'.
> @@ -534,7 +559,9 @@ impl BackupEnvironment {
> }
> }
>
> - self.verify_chunk_existance(&state.known_chunks)?;
> + if !self.last_backup_has_recent_verify()? {
> + self.verify_chunk_existance(&state.known_chunks)?;
> + }
>
> // marks the backup as successful
> state.finished = true;
>
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 13:32 ` Thomas Lamprecht
@ 2020-09-30 13:42 ` Stefan Reiter
2020-09-30 13:55 ` Thomas Lamprecht
0 siblings, 1 reply; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:42 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Backup Server development discussion
On 9/30/20 3:32 PM, Thomas Lamprecht wrote:
> On 30.09.20 15:25, Stefan Reiter wrote:
>> If the base was successfully verified within the last 7 days, we assume
>> that it is okay and all chunks exist, so we don't have to check.
>>
>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>> ---
>> src/api2/backup/environment.rs | 29 ++++++++++++++++++++++++++++-
>> 1 file changed, 28 insertions(+), 1 deletion(-)
>>
>> diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
>> index d515bf30..be06d1dc 100644
>> --- a/src/api2/backup/environment.rs
>> +++ b/src/api2/backup/environment.rs
>> @@ -457,6 +457,31 @@ impl BackupEnvironment {
>> Ok(())
>> }
>>
>> + fn last_backup_has_recent_verify(&self) -> Result<bool, Error> {
>> + match &self.last_backup {
>> + Some(last_backup) => {
>> + let last_dir = &last_backup.backup_dir;
>> + let (manifest, _) = self.datastore.load_manifest(last_dir)?;
>> + let verify = manifest.unprotected["verify_state"].clone();
>> + match serde_json::from_value::<Option<SnapshotVerifyState>>(verify) {
>> + Ok(verify) => match verify {
>> + Some(verify) => {
>> + let cutoff = unsafe { libc::time(std::ptr::null_mut()) };
>> + let cutoff = cutoff - 60*60*24*7; // one week back
>
> Why unsafe and why not our `proxmox::tools::time::epoch_i64()` ?
>
Didn't know that exists, I've seen that unsafe a few times so I thought
that was the usual way. I'll send v2.
>
> on another note: we should probably add some helper for getting the
> verify state
>
I'll see if it's feasible. So far we only have two sites that both do
different things on individual errors, so I'm not sure how much that
would simplify things.
>> + Ok(verify.state == VerifyState::Ok && verify.upid.starttime > cutoff)
>> + },
>> + None => Ok(false)
>> + },
>> + Err(err) => {
>> + self.worker.warn(format!("error parsing base verification state : '{}'", err));
>> + Ok(false)
>> + }
>> + }
>> + },
>> + None => Ok(false)
>> + }
>> + }
>> +
>> /// Ensure all chunks referenced in this backup actually exist.
>> /// Only call *after* all writers have been closed, to avoid race with GC.
>> /// In case of error, mark the previous backup as 'verify failed'.
>> @@ -534,7 +559,9 @@ impl BackupEnvironment {
>> }
>> }
>>
>> - self.verify_chunk_existance(&state.known_chunks)?;
>> + if !self.last_backup_has_recent_verify()? {
>> + self.verify_chunk_existance(&state.known_chunks)?;
>> + }
>>
>> // marks the backup as successful
>> state.finished = true;
>>
>
>
^ permalink raw reply [flat|nested] 9+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified
2020-09-30 13:42 ` Stefan Reiter
@ 2020-09-30 13:55 ` Thomas Lamprecht
0 siblings, 0 replies; 9+ messages in thread
From: Thomas Lamprecht @ 2020-09-30 13:55 UTC (permalink / raw)
To: Stefan Reiter, Proxmox Backup Server development discussion
On 30.09.20 15:42, Stefan Reiter wrote:
> On 9/30/20 3:32 PM, Thomas Lamprecht wrote:
>> on another note: we should probably add some helper for getting the
>> verify state
>>
>
> I'll see if it's feasible. So far we only have two sites that both do different things on individual errors, so I'm not sure how much that would simplify things.
>
Hasn't to be now, just had a feeling I saw (and/or wrote) this already
a few times.
More important may be to make the unprotected part of the manifest safer
for parallel read/write access (RW lock?).
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
` (2 subsequent siblings)
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
To: pbs-devel
Enables non-blocking send. Only use when the data being sent is small,
otherwise the channel buffer might take a lot of memory.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup/verify.rs | 3 ++-
src/client/pull.rs | 3 ++-
src/tools/parallel_handler.rs | 15 ++++++++++++---
3 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index fd48d907..c24d48e6 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -129,7 +129,8 @@ fn verify_index_chunks(
}
Ok(())
- }
+ },
+ true
);
for pos in 0..index.index_count() {
diff --git a/src/client/pull.rs b/src/client/pull.rs
index d88d64f9..08d92d6e 100644
--- a/src/client/pull.rs
+++ b/src/client/pull.rs
@@ -57,7 +57,8 @@ async fn pull_index_chunks<I: IndexFile>(
chunk.verify_unencrypted(size as usize, &digest)?;
target.insert_chunk(&chunk, &digest)?;
Ok(())
- }
+ },
+ true
);
let verify_and_write_channel = verify_pool.channel();
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index f1d9adec..b15fc046 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
-use crossbeam_channel::{bounded, Sender};
+use crossbeam_channel::{bounded, unbounded, Sender};
/// A handle to send data to the worker thread (implements clone)
pub struct SendHandle<I> {
@@ -57,11 +57,20 @@ impl<I> Clone for SendHandle<I> {
impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
/// Create a new thread pool, each thread processing incoming data
/// with 'handler_fn'.
- pub fn new<F>(name: &str, threads: usize, handler_fn: F) -> Self
+ pub fn new<F>(
+ name: &str,
+ threads: usize,
+ handler_fn: F,
+ bounded_mode: bool,
+ ) -> Self
where F: Fn(I) -> Result<(), Error> + Send + Clone + 'a,
{
let mut handles = Vec::new();
- let (input_tx, input_rx) = bounded::<I>(threads);
+ let (input_tx, input_rx) = if bounded_mode {
+ bounded::<I>(threads)
+ } else {
+ unbounded::<I>()
+ };
let abort = Arc::new(Mutex::new(None));
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [PATCH proxmox-backup 1/5] backup: don't validate chunk existance if base was recently verified Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 2/5] ParallelHandler: add unbounded mode Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
To: pbs-devel
Enables outside functions to check if an error has occurred without
calling send. Also fix a potential bug where errors that happen after
the SendHandle has been dropped while doing the thread join might have
been ignored. Requires internal check_abort to be moved out of 'impl
SendHandle'.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools/parallel_handler.rs | 29 +++++++++++++++++++----------
1 file changed, 19 insertions(+), 10 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index b15fc046..7b92c23a 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -10,19 +10,19 @@ pub struct SendHandle<I> {
abort: Arc<Mutex<Option<String>>>,
}
-impl<I: Send> SendHandle<I> {
- /// Returns the first error happened, if any
- pub fn check_abort(&self) -> Result<(), Error> {
- let guard = self.abort.lock().unwrap();
- if let Some(err_msg) = &*guard {
- return Err(format_err!("{}", err_msg));
- }
- Ok(())
+/// Returns the first error happened, if any
+pub fn check_abort(abort: Arc<Mutex<Option<String>>>) -> Result<(), Error> {
+ let guard = abort.lock().unwrap();
+ if let Some(err_msg) = &*guard {
+ return Err(format_err!("{}", err_msg));
}
+ Ok(())
+}
+impl<I: Send> SendHandle<I> {
/// Send data to the worker threads
pub fn send(&self, input: I) -> Result<(), Error> {
- self.check_abort()?;
+ check_abort(Arc::clone(&self.abort))?;
match self.input.send(input) {
Ok(()) => Ok(()),
Err(_) => bail!("send failed - channel closed"),
@@ -128,14 +128,23 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
Ok(())
}
+ /// Return Err if at least one invocation of the callback failed
+ /// Panics if complete() has been called on this instance
+ pub fn check_abort(&self) -> Result<(), Error> {
+ check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
+ }
+
/// Wait for worker threads to complete and check for errors
pub fn complete(mut self) -> Result<(), Error> {
- self.input.as_ref().unwrap().check_abort()?;
+ let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
+ check_abort(Arc::clone(&abort))?;
drop(self.input.take());
let msg_list = self.join_threads();
if msg_list.is_empty() {
+ // an error might be encountered while waiting for the join
+ check_abort(abort)?;
return Ok(());
}
Err(format_err!("{}", msg_list.join("\n")))
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
` (2 preceding siblings ...)
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 3/5] ParallelHandler: add check_abort function and handle errors during join Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
To: pbs-devel
We only store one error anyway, so no point in continuing to process
data if one thread has already failed. Especially important for
unbounded mode, where there's possibly still a lot of data to go
through, so complete() doesn't wait for all of that to happen.
Also abort on drop, if the caller wants to wait for completion, he has
to call complete().
Current logic should be unaffected:
* 'verify' never returns an error from handler_fn
* 'pull' errors immediately anyway once 'send' or 'complete' fail, so
it doesn't matter if that happens a little earlier and some chunks are
left unwritten
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools/parallel_handler.rs | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/src/tools/parallel_handler.rs b/src/tools/parallel_handler.rs
index 7b92c23a..102ecce3 100644
--- a/src/tools/parallel_handler.rs
+++ b/src/tools/parallel_handler.rs
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use anyhow::{bail, format_err, Error};
@@ -8,6 +9,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
pub struct SendHandle<I> {
input: Sender<I>,
abort: Arc<Mutex<Option<String>>>,
+ aborted: Arc<AtomicBool>,
}
/// Returns the first error happened, if any
@@ -50,6 +52,7 @@ impl<I> Clone for SendHandle<I> {
Self {
input: self.input.clone(),
abort: Arc::clone(&self.abort),
+ aborted: Arc::clone(&self.aborted),
}
}
}
@@ -73,10 +76,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
};
let abort = Arc::new(Mutex::new(None));
+ let aborted = Arc::new(AtomicBool::new(false));
for i in 0..threads {
let input_rx = input_rx.clone();
let abort = Arc::clone(&abort);
+ let aborted = Arc::clone(&aborted);
// Erase the 'a lifetime bound. This is safe because we
// join all thread in the drop handler.
@@ -89,6 +94,10 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
std::thread::Builder::new()
.name(format!("{} ({})", name, i))
.spawn(move || loop {
+ if aborted.load(Ordering::Acquire) {
+ // some other thread aborted, exit early
+ return;
+ }
let data = match input_rx.recv() {
Ok(data) => data,
Err(_) => return,
@@ -96,10 +105,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
match (handler_fn)(data) {
Ok(()) => (),
Err(err) => {
+ aborted.store(true, Ordering::Release);
let mut guard = abort.lock().unwrap();
if guard.is_none() {
*guard = Some(err.to_string());
}
+ return;
}
}
})
@@ -112,6 +123,7 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
input: Some(SendHandle {
input: input_tx,
abort,
+ aborted,
}),
_marker: std::marker::PhantomData,
}
@@ -134,7 +146,8 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
check_abort(Arc::clone(&self.input.as_ref().unwrap().abort))
}
- /// Wait for worker threads to complete and check for errors
+ /// Wait for worker threads to complete and check for errors.
+ /// Only this ensures completion. Dropping the instance aborts instead.
pub fn complete(mut self) -> Result<(), Error> {
let abort = Arc::clone(&self.input.as_ref().unwrap().abort);
check_abort(Arc::clone(&abort))?;
@@ -179,7 +192,12 @@ impl<'a, I: Send + 'static> ParallelHandler<'a, I> {
// Note: We make sure that all threads will be joined
impl<'a, I> Drop for ParallelHandler<'a, I> {
fn drop(&mut self) {
- drop(self.input.take());
+ let input = self.input.take();
+ if let Some(input) = &input {
+ // shut down ASAP
+ input.aborted.store(true, Ordering::Release);
+ }
+ drop(input);
while let Some(handle) = self.handles.pop() {
let _ = handle.join();
}
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread
* [pbs-devel] [RFC proxmox-backup 5/5] backup: validate chunk existance in background
2020-09-30 13:25 [pbs-devel] [PATCH 0/5] backup validation improvements Stefan Reiter
` (3 preceding siblings ...)
2020-09-30 13:25 ` [pbs-devel] [RFC proxmox-backup 4/5] ParallelHandler: exit early if this or other thread aborted Stefan Reiter
@ 2020-09-30 13:25 ` Stefan Reiter
4 siblings, 0 replies; 9+ messages in thread
From: Stefan Reiter @ 2020-09-30 13:25 UTC (permalink / raw)
To: pbs-devel
Reused chunks will not be uploaded, and thus never touched. We need to
verify their existance manually to ensure a valid backup.
Since we know all chunks that the client may reuse must be recorded in
the previous snapshot (which is locked during backup and can't be
forgotten), we can do the validation in the background, while the backup
is still running, and only join at the end if there's still work left.
The tradeoff here is that we don't know yet which chunks the client will
*not* reuse later in the backup, so we have to check them all.
This also means we can revert the changes to the KnownChunksMap type
made in 43772efc6e.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/api2/backup/environment.rs | 155 ++++++++++++++++++++++-----------
1 file changed, 105 insertions(+), 50 deletions(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index be06d1dc..cc41032a 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -11,8 +11,9 @@ use proxmox::api::{RpcEnvironment, RpcEnvironmentType};
use crate::api2::types::{Userid, SnapshotVerifyState, VerifyState};
use crate::backup::*;
-use crate::server::WorkerTask;
+use crate::server::{WorkerTask, UPID};
use crate::server::formatter::*;
+use crate::tools::ParallelHandler;
use hyper::{Body, Response};
#[derive(Copy, Clone, Serialize)]
@@ -66,8 +67,14 @@ struct FixedWriterState {
incremental: bool,
}
-// key=digest, value=(length, existance checked)
-type KnownChunksMap = HashMap<[u8;32], (u32, bool)>;
+// key=digest, value=length
+type KnownChunksMap = HashMap<[u8;32], u32>;
+
+enum ValidateHandlerState {
+ NotInitialized,
+ NotNeeded,
+ Running(ParallelHandler<'static, [u8;32]>),
+}
struct SharedBackupState {
finished: bool,
@@ -78,6 +85,7 @@ struct SharedBackupState {
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
+ validate_handler: ValidateHandlerState,
}
impl SharedBackupState {
@@ -131,6 +139,7 @@ impl BackupEnvironment {
known_chunks: HashMap::new(),
backup_size: 0,
backup_stat: UploadStatistic::new(),
+ validate_handler: ValidateHandlerState::NotInitialized,
};
Self {
@@ -156,11 +165,89 @@ impl BackupEnvironment {
state.ensure_unfinished()?;
- state.known_chunks.insert(digest, (length, false));
+ state.known_chunks.insert(digest, length);
+
+ match &state.validate_handler {
+ ValidateHandlerState::NotInitialized => {
+ if self.last_backup_has_recent_verify()? {
+ state.validate_handler = ValidateHandlerState::NotNeeded;
+ } else {
+ let handler = self.start_validate_handler();
+ handler.send(digest)?;
+ state.validate_handler = ValidateHandlerState::Running(handler);
+ }
+ },
+ ValidateHandlerState::Running(handler) => {
+ handler.send(digest)?;
+ },
+ ValidateHandlerState::NotNeeded => {}
+ }
Ok(())
}
+ fn start_validate_handler(&self) -> ParallelHandler<'static, [u8;32]> {
+ let datastore = Arc::clone(&self.datastore);
+ let upid = Arc::new(self.worker.upid().clone());
+ let last_backup = Arc::new(self.last_backup.clone());
+ ParallelHandler::new(
+ "verify handler",
+ 1, // one worker is enough, and means we don't need a lock to mark the prev snapshot
+ move |digest| {
+ Self::validate_chunk_existance(
+ &digest,
+ Arc::clone(&datastore),
+ Arc::clone(&upid),
+ Arc::clone(&last_backup)
+ )
+ },
+ false // don't block on send
+ )
+ }
+
+ fn validate_chunk_existance(
+ digest: &[u8;32],
+ datastore: Arc<DataStore>,
+ upid: Arc<UPID>,
+ last_backup: Arc<Option<BackupInfo>>,
+ ) -> Result<(), Error> {
+ if !datastore.chunk_path(digest).0.exists() {
+ // Chunk is missing, mark last snapshot (which references it) as "verify failed"
+ let mark_msg = if let Some(ref last_backup) = *last_backup {
+ let last_dir = &last_backup.backup_dir;
+ let verify_state = SnapshotVerifyState {
+ state: VerifyState::Failed,
+ upid: UPID::clone(upid.as_ref()),
+ };
+
+ let res = proxmox::try_block!{
+ let (mut manifest, _) = datastore.load_manifest(last_dir)?;
+ manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
+ datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
+ };
+
+ if let Err(err) = res {
+ format!("tried marking previous snapshot as bad, \
+ but got error accessing manifest: {}", err)
+ } else {
+ "marked previous snapshot as bad, please use \
+ 'verify' for a detailed check".to_owned()
+ }
+ } else {
+ "internal error: no base backup registered to mark invalid".to_owned()
+ };
+
+ bail!(
+ "chunk '{}' was attempted to be reused but doesn't exist - {}",
+ digest_to_hex(digest),
+ mark_msg
+ );
+ }
+
+ Ok(())
+ }
+
+
/// Register fixed length chunks after upload.
///
/// Like `register_chunk()`, but additionally record statistics for
@@ -176,6 +263,9 @@ impl BackupEnvironment {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
+ if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+ handler.check_abort()?;
+ }
let mut data = match state.fixed_writers.get_mut(&wid) {
Some(data) => data,
@@ -198,7 +288,7 @@ impl BackupEnvironment {
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
- state.known_chunks.insert(digest, (size, true));
+ state.known_chunks.insert(digest, size);
Ok(())
}
@@ -218,6 +308,9 @@ impl BackupEnvironment {
let mut state = self.state.lock().unwrap();
state.ensure_unfinished()?;
+ if let ValidateHandlerState::Running(handler) = &state.validate_handler {
+ handler.check_abort()?;
+ }
let mut data = match state.dynamic_writers.get_mut(&wid) {
Some(data) => data,
@@ -231,7 +324,7 @@ impl BackupEnvironment {
if is_duplicate { data.upload_stat.duplicates += 1; }
// register chunk
- state.known_chunks.insert(digest, (size, true));
+ state.known_chunks.insert(digest, size);
Ok(())
}
@@ -240,7 +333,7 @@ impl BackupEnvironment {
let state = self.state.lock().unwrap();
match state.known_chunks.get(digest) {
- Some((len, _)) => Some(*len),
+ Some(len) => Some(*len),
None => None,
}
}
@@ -482,47 +575,6 @@ impl BackupEnvironment {
}
}
- /// Ensure all chunks referenced in this backup actually exist.
- /// Only call *after* all writers have been closed, to avoid race with GC.
- /// In case of error, mark the previous backup as 'verify failed'.
- fn verify_chunk_existance(&self, known_chunks: &KnownChunksMap) -> Result<(), Error> {
- for (digest, (_, checked)) in known_chunks.iter() {
- if !checked && !self.datastore.chunk_path(digest).0.exists() {
- let mark_msg = if let Some(ref last_backup) = self.last_backup {
- let last_dir = &last_backup.backup_dir;
- let verify_state = SnapshotVerifyState {
- state: VerifyState::Failed,
- upid: self.worker.upid().clone(),
- };
-
- let res = proxmox::try_block!{
- let (mut manifest, _) = self.datastore.load_manifest(last_dir)?;
- manifest.unprotected["verify_state"] = serde_json::to_value(verify_state)?;
- self.datastore.store_manifest(last_dir, serde_json::to_value(manifest)?)
- };
-
- if let Err(err) = res {
- format!("tried marking previous snapshot as bad, \
- but got error accessing manifest: {}", err)
- } else {
- "marked previous snapshot as bad, please use \
- 'verify' for a detailed check".to_owned()
- }
- } else {
- "internal error: no base backup registered to mark invalid".to_owned()
- };
-
- bail!(
- "chunk '{}' was attempted to be reused but doesn't exist - {}",
- digest_to_hex(digest),
- mark_msg
- );
- }
- }
-
- Ok(())
- }
-
/// Mark backup as finished
pub fn finish_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@@ -559,8 +611,11 @@ impl BackupEnvironment {
}
}
- if !self.last_backup_has_recent_verify()? {
- self.verify_chunk_existance(&state.known_chunks)?;
+ // stop verify handler and verify remaining chunks
+ let handler = std::mem::replace(&mut state.validate_handler, ValidateHandlerState::NotInitialized);
+ if let ValidateHandlerState::Running(handler) = handler {
+ self.worker.log("waiting for validate thread to complete");
+ handler.complete()?;
}
// marks the backup as successful
--
2.20.1
^ permalink raw reply [flat|nested] 9+ messages in thread