From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 13/15] verify: factor out common parameters
Date: Mon, 25 Jan 2021 14:42:58 +0100 [thread overview]
Message-ID: <20210125134302.3394328-14-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20210125134302.3394328-1-f.gruenbichler@proxmox.com>
all the verify methods pass along the following:
- task worker
- datastore
- corrupt and verified chunks
might as well pull that out into a common type, with the added bonus of
now having a single point for construction instead of copying the
default capacaties in three different modules..
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
src/api2/admin/datastore.rs | 17 +---
src/api2/backup/environment.rs | 10 +-
src/backup/verify.rs | 174 ++++++++++++++-------------------
src/server/verify_job.rs | 3 +-
4 files changed, 85 insertions(+), 119 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index ba8f3417..3d5b6af6 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -3,7 +3,6 @@
use std::collections::HashSet;
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
-use std::sync::{Arc, Mutex};
use std::path::{Path, PathBuf};
use std::pin::Pin;
@@ -672,17 +671,12 @@ pub fn verify(
auth_id.clone(),
to_stdout,
move |worker| {
- let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
- let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
-
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
let failed_dirs = if let Some(backup_dir) = backup_dir {
let mut res = Vec::new();
if !verify_backup_dir(
- datastore,
+ &verify_worker,
&backup_dir,
- verified_chunks,
- corrupt_chunks,
- worker.clone(),
worker.upid().clone(),
None,
)? {
@@ -691,12 +685,9 @@ pub fn verify(
res
} else if let Some(backup_group) = backup_group {
let failed_dirs = verify_backup_group(
- datastore,
+ &verify_worker,
&backup_group,
- verified_chunks,
- corrupt_chunks,
&mut StoreProgress::new(1),
- worker.clone(),
worker.upid(),
None,
)?;
@@ -711,7 +702,7 @@ pub fn verify(
None
};
- verify_all_backups(datastore, worker.clone(), worker.upid(), owner, None)?
+ verify_all_backups(&verify_worker, worker.upid(), owner, None)?
};
if !failed_dirs.is_empty() {
worker.log("Failed to verify the following snapshots/groups:");
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 38061816..c8f52b6e 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,6 +1,6 @@
use anyhow::{bail, format_err, Error};
use std::sync::{Arc, Mutex};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
use nix::dir::Dir;
use ::serde::{Serialize};
@@ -525,15 +525,11 @@ impl BackupEnvironment {
move |worker| {
worker.log("Automatically verifying newly added snapshot");
- let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
- let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
if !verify_backup_dir_with_lock(
- datastore,
+ &verify_worker,
&backup_dir,
- verified_chunks,
- corrupt_chunks,
- worker.clone(),
worker.upid().clone(),
None,
snap_lock,
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 5e4bc7fb..ac4a6c29 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -29,6 +29,29 @@ use crate::{
tools::fs::lock_dir_noblock_shared,
};
+/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
+/// already been verified or detected as corrupt.
+pub struct VerifyWorker {
+ worker: Arc<dyn TaskState + Send + Sync>,
+ datastore: Arc<DataStore>,
+ verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+ corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
+}
+
+impl VerifyWorker {
+ /// Creates a new VerifyWorker for a given task worker and datastore.
+ pub fn new(worker: Arc<dyn TaskState + Send + Sync>, datastore: Arc<DataStore>) -> Self {
+ Self {
+ worker,
+ datastore,
+ // start with 16k chunks == up to 64G data
+ verified_chunks: Arc::new(Mutex::new(HashSet::with_capacity(16*1024))),
+ // start with 64 chunks since we assume there are few corrupt ones
+ corrupt_chunks: Arc::new(Mutex::new(HashSet::with_capacity(64))),
+ }
+ }
+}
+
fn verify_blob(datastore: Arc<DataStore>, backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
let blob = datastore.load_blob(backup_dir, &info.filename)?;
@@ -82,12 +105,9 @@ fn rename_corrupted_chunk(
}
fn verify_index_chunks(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
index: Box<dyn IndexFile + Send>,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_mode: CryptMode,
- worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let errors = Arc::new(AtomicUsize::new(0));
@@ -97,10 +117,10 @@ fn verify_index_chunks(
let mut read_bytes = 0;
let mut decoded_bytes = 0;
- let worker2 = Arc::clone(&worker);
- let datastore2 = Arc::clone(&datastore);
- let corrupt_chunks2 = Arc::clone(&corrupt_chunks);
- let verified_chunks2 = Arc::clone(&verified_chunks);
+ let worker2 = Arc::clone(&verify_worker.worker);
+ let datastore2 = Arc::clone(&verify_worker.datastore);
+ let corrupt_chunks2 = Arc::clone(&verify_worker.corrupt_chunks);
+ let verified_chunks2 = Arc::clone(&verify_worker.verified_chunks);
let errors2 = Arc::clone(&errors);
let decoder_pool = ParallelHandler::new(
@@ -141,29 +161,29 @@ fn verify_index_chunks(
for pos in 0..index.index_count() {
- worker.check_abort()?;
+ verify_worker.worker.check_abort()?;
crate::tools::fail_on_shutdown()?;
let info = index.chunk_info(pos).unwrap();
let size = info.size();
- if verified_chunks.lock().unwrap().contains(&info.digest) {
+ if verify_worker.verified_chunks.lock().unwrap().contains(&info.digest) {
continue; // already verified
}
- if corrupt_chunks.lock().unwrap().contains(&info.digest) {
+ if verify_worker.corrupt_chunks.lock().unwrap().contains(&info.digest) {
let digest_str = proxmox::tools::digest_to_hex(&info.digest);
- task_log!(worker, "chunk {} was marked as corrupt", digest_str);
+ task_log!(verify_worker.worker, "chunk {} was marked as corrupt", digest_str);
errors.fetch_add(1, Ordering::SeqCst);
continue;
}
- match datastore.load_chunk(&info.digest) {
+ match verify_worker.datastore.load_chunk(&info.digest) {
Err(err) => {
- corrupt_chunks.lock().unwrap().insert(info.digest);
- task_log!(worker, "can't verify chunk, load failed - {}", err);
+ verify_worker.corrupt_chunks.lock().unwrap().insert(info.digest);
+ task_log!(verify_worker.worker, "can't verify chunk, load failed - {}", err);
errors.fetch_add(1, Ordering::SeqCst);
- rename_corrupted_chunk(datastore.clone(), &info.digest, &worker);
+ rename_corrupted_chunk(verify_worker.datastore.clone(), &info.digest, &verify_worker.worker);
continue;
}
Ok(chunk) => {
@@ -187,7 +207,7 @@ fn verify_index_chunks(
let error_count = errors.load(Ordering::SeqCst);
task_log!(
- worker,
+ verify_worker.worker,
" verified {:.2}/{:.2} MiB in {:.2} seconds, speed {:.2}/{:.2} MiB/s ({} errors)",
read_bytes_mib,
decoded_bytes_mib,
@@ -205,18 +225,15 @@ fn verify_index_chunks(
}
fn verify_fixed_index(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
info: &FileInfo,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
- let index = datastore.open_fixed_reader(&path)?;
+ let index = verify_worker.datastore.open_fixed_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
@@ -228,28 +245,22 @@ fn verify_fixed_index(
}
verify_index_chunks(
- datastore,
+ verify_worker,
Box::new(index),
- verified_chunks,
- corrupt_chunks,
info.chunk_crypt_mode(),
- worker,
)
}
fn verify_dynamic_index(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
info: &FileInfo,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<dyn TaskState + Send + Sync>,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
path.push(&info.filename);
- let index = datastore.open_dynamic_reader(&path)?;
+ let index = verify_worker.datastore.open_dynamic_reader(&path)?;
let (csum, size) = index.compute_csum();
if size != info.size {
@@ -261,12 +272,9 @@ fn verify_dynamic_index(
}
verify_index_chunks(
- datastore,
+ verify_worker,
Box::new(index),
- verified_chunks,
- corrupt_chunks,
info.chunk_crypt_mode(),
- worker,
)
}
@@ -280,34 +288,28 @@ fn verify_dynamic_index(
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_dir(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<dyn TaskState + Send + Sync>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
let snap_lock = lock_dir_noblock_shared(
- &datastore.snapshot_path(&backup_dir),
+ &verify_worker.datastore.snapshot_path(&backup_dir),
"snapshot",
"locked by another operation");
match snap_lock {
Ok(snap_lock) => verify_backup_dir_with_lock(
- datastore,
+ verify_worker,
backup_dir,
- verified_chunks,
- corrupt_chunks,
- worker,
upid,
filter,
snap_lock
),
Err(err) => {
task_log!(
- worker,
+ verify_worker.worker,
"SKIPPED: verify {}:{} - could not acquire snapshot lock: {}",
- datastore.name(),
+ verify_worker.datastore.name(),
backup_dir,
err,
);
@@ -318,22 +320,19 @@ pub fn verify_backup_dir(
/// See verify_backup_dir
pub fn verify_backup_dir_with_lock(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
backup_dir: &BackupDir,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- worker: Arc<dyn TaskState + Send + Sync>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
_snap_lock: Dir,
) -> Result<bool, Error> {
- let manifest = match datastore.load_manifest(&backup_dir) {
+ let manifest = match verify_worker.datastore.load_manifest(&backup_dir) {
Ok((manifest, _)) => manifest,
Err(err) => {
task_log!(
- worker,
+ verify_worker.worker,
"verify {}:{} - manifest load error: {}",
- datastore.name(),
+ verify_worker.datastore.name(),
backup_dir,
err,
);
@@ -344,54 +343,48 @@ pub fn verify_backup_dir_with_lock(
if let Some(filter) = filter {
if !filter(&manifest) {
task_log!(
- worker,
+ verify_worker.worker,
"SKIPPED: verify {}:{} (recently verified)",
- datastore.name(),
+ verify_worker.datastore.name(),
backup_dir,
);
return Ok(true);
}
}
- task_log!(worker, "verify {}:{}", datastore.name(), backup_dir);
+ task_log!(verify_worker.worker, "verify {}:{}", verify_worker.datastore.name(), backup_dir);
let mut error_count = 0;
let mut verify_result = VerifyState::Ok;
for info in manifest.files() {
let result = proxmox::try_block!({
- task_log!(worker, " check {}", info.filename);
+ task_log!(verify_worker.worker, " check {}", info.filename);
match archive_type(&info.filename)? {
ArchiveType::FixedIndex =>
verify_fixed_index(
- datastore.clone(),
+ verify_worker,
&backup_dir,
info,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- worker.clone(),
),
ArchiveType::DynamicIndex =>
verify_dynamic_index(
- datastore.clone(),
+ verify_worker,
&backup_dir,
info,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- worker.clone(),
),
- ArchiveType::Blob => verify_blob(datastore.clone(), &backup_dir, info),
+ ArchiveType::Blob => verify_blob(verify_worker.datastore.clone(), &backup_dir, info),
}
});
- worker.check_abort()?;
+ verify_worker.worker.check_abort()?;
crate::tools::fail_on_shutdown()?;
if let Err(err) = result {
task_log!(
- worker,
+ verify_worker.worker,
"verify {}:{}/{} failed: {}",
- datastore.name(),
+ verify_worker.datastore.name(),
backup_dir,
info.filename,
err,
@@ -407,7 +400,7 @@ pub fn verify_backup_dir_with_lock(
upid,
};
let verify_state = serde_json::to_value(verify_state)?;
- datastore.update_manifest(&backup_dir, |manifest| {
+ verify_worker.datastore.update_manifest(&backup_dir, |manifest| {
manifest.unprotected["verify_state"] = verify_state;
}).map_err(|err| format_err!("unable to update manifest blob - {}", err))?;
@@ -422,24 +415,21 @@ pub fn verify_backup_dir_with_lock(
/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
/// - Err(_) if task was aborted
pub fn verify_backup_group(
- datastore: Arc<DataStore>,
+ verify_worker: &VerifyWorker,
group: &BackupGroup,
- verified_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
- corrupt_chunks: Arc<Mutex<HashSet<[u8;32]>>>,
progress: &mut StoreProgress,
- worker: Arc<dyn TaskState + Send + Sync>,
upid: &UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
- let mut list = match group.list_backups(&datastore.base_path()) {
+ let mut list = match group.list_backups(&verify_worker.datastore.base_path()) {
Ok(list) => list,
Err(err) => {
task_log!(
- worker,
+ verify_worker.worker,
"verify group {}:{} - unable to list backups: {}",
- datastore.name(),
+ verify_worker.datastore.name(),
group,
err,
);
@@ -448,18 +438,15 @@ pub fn verify_backup_group(
};
let snapshot_count = list.len();
- task_log!(worker, "verify group {}:{} ({} snapshots)", datastore.name(), group, snapshot_count);
+ task_log!(verify_worker.worker, "verify group {}:{} ({} snapshots)", verify_worker.datastore.name(), group, snapshot_count);
progress.group_snapshots = snapshot_count as u64;
BackupInfo::sort_list(&mut list, false); // newest first
for (pos, info) in list.into_iter().enumerate() {
if !verify_backup_dir(
- datastore.clone(),
+ verify_worker,
&info.backup_dir,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
- worker.clone(),
upid.clone(),
filter,
)? {
@@ -467,7 +454,7 @@ pub fn verify_backup_group(
}
progress.done_snapshots = pos as u64 + 1;
task_log!(
- worker,
+ verify_worker.worker,
"percentage done: {}",
progress
);
@@ -484,22 +471,22 @@ pub fn verify_backup_group(
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
pub fn verify_all_backups(
- datastore: Arc<DataStore>,
- worker: Arc<dyn TaskState + Send + Sync>,
+ verify_worker: &VerifyWorker,
upid: &UPID,
owner: Option<Authid>,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<Vec<String>, Error> {
let mut errors = Vec::new();
+ let worker = Arc::clone(&verify_worker.worker);
- task_log!(worker, "verify datastore {}", datastore.name());
+ task_log!(worker, "verify datastore {}", verify_worker.datastore.name());
if let Some(owner) = &owner {
task_log!(worker, "limiting to backups owned by {}", owner);
}
let filter_by_owner = |group: &BackupGroup| {
- match (datastore.get_owner(group), &owner) {
+ match (verify_worker.datastore.get_owner(group), &owner) {
(Ok(ref group_owner), Some(owner)) => {
group_owner == owner
|| (group_owner.is_token()
@@ -527,7 +514,7 @@ pub fn verify_all_backups(
}
};
- let mut list = match BackupInfo::list_backup_groups(&datastore.base_path()) {
+ let mut list = match BackupInfo::list_backup_groups(&verify_worker.datastore.base_path()) {
Ok(list) => list
.into_iter()
.filter(|group| !(group.backup_type() == "host" && group.backup_id() == "benchmark"))
@@ -545,12 +532,6 @@ pub fn verify_all_backups(
list.sort_unstable();
- // start with 16384 chunks (up to 65GB)
- let verified_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024*16)));
-
- // start with 64 chunks since we assume there are few corrupt ones
- let corrupt_chunks = Arc::new(Mutex::new(HashSet::with_capacity(64)));
-
let group_count = list.len();
task_log!(worker, "found {} groups", group_count);
@@ -562,12 +543,9 @@ pub fn verify_all_backups(
progress.group_snapshots = 0;
let mut group_errors = verify_backup_group(
- datastore.clone(),
+ verify_worker,
&group,
- verified_chunks.clone(),
- corrupt_chunks.clone(),
&mut progress,
- worker.clone(),
upid,
filter,
)?;
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index ca6eb554..1dd8baa7 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -67,7 +67,8 @@ pub fn do_verification_job(
task_log!(worker,"task triggered by schedule '{}'", event_str);
}
- let result = verify_all_backups(datastore, worker.clone(), worker.upid(), None, Some(&filter));
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ let result = verify_all_backups(&verify_worker, worker.upid(), None, Some(&filter));
let job_result = match result {
Ok(ref failed_dirs) if failed_dirs.is_empty() => Ok(()),
Ok(ref failed_dirs) => {
--
2.20.1
next prev parent reply other threads:[~2021-01-25 13:43 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-25 13:42 [pbs-devel] [PATCH proxmox-backup(-qemu) 00/17] clippy refactorings Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 01/15] report: type-alias function call tuple Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 02/15] broadcast_future: refactor broadcast/future binding Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 03/15] client: refactor catalog upload spawning Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 04/15] allow complex Futures in tower_service impl Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 05/15] async index reader: typedef ReadFuture Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 06/15] systemd/time: extract Time/DateSpec structs Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 07/15] client: factor out UploadOptions Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 08/15] pxar: typedef on_error as ErrorHandler Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 09/15] pxar: factor out PxarCreateOptions Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 10/15] pxar: extract PxarExtractOptions Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 11/15] authid: make Tokenname(Ref) derive Eq Fabian Grünbichler
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 12/15] derive/impl and use Default for some structs Fabian Grünbichler
2021-01-25 13:42 ` Fabian Grünbichler [this message]
2021-01-25 13:42 ` [pbs-devel] [PATCH proxmox-backup 14/15] clippy: allow api functions with many arguments Fabian Grünbichler
2021-01-25 13:43 ` [pbs-devel] [PATCH proxmox-backup 15/15] clippy: more misc fixes Fabian Grünbichler
2021-01-25 13:43 ` [pbs-devel] [PATCH proxmox-backup-qemu 1/2] use UploadOptions for uploading Blobs Fabian Grünbichler
2021-01-25 13:43 ` [pbs-devel] [PATCH proxmox-backup-qemu 2/2] use new HttpClientOptions constructors Fabian Grünbichler
2021-01-26 9:44 ` [pbs-devel] applied series: [PATCH proxmox-backup(-qemu) 00/17] clippy refactorings Wolfgang Bumiller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210125134302.3394328-14-f.gruenbichler@proxmox.com \
--to=f.gruenbichler@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.