From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 1/3] server: sync: return `PullStats` for pull related methods
Date: Wed, 6 Mar 2024 15:11:51 +0100 [thread overview]
Message-ID: <20240306141153.419283-2-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240306141153.419283-1-c.ebner@proxmox.com>
Return basic statistics on pull related methods via `PullStats` objects,
in order to construct a global summary for sync jobs.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 125 ++++++++++++++++++++++++++++++---------------
1 file changed, 85 insertions(+), 40 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 5a4ba806..7d745c77 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
-use std::time::SystemTime;
+use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::StatusCode;
@@ -64,6 +64,21 @@ pub(crate) struct LocalSource {
ns: BackupNamespace,
}
+#[derive(Default)]
+pub(crate) struct PullStats {
+ pub(crate) chunk_count: usize,
+ pub(crate) bytes: usize,
+ pub(crate) elapsed: Duration,
+}
+
+impl PullStats {
+ fn add(&mut self, rhs: PullStats) {
+ self.chunk_count += rhs.chunk_count;
+ self.bytes += rhs.bytes;
+ self.elapsed += rhs.elapsed;
+ }
+}
+
#[async_trait::async_trait]
/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
/// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -559,7 +574,7 @@ async fn pull_index_chunks<I: IndexFile>(
target: Arc<DataStore>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
let start_time = SystemTime::now();
@@ -594,12 +609,14 @@ async fn pull_index_chunks<I: IndexFile>(
let verify_and_write_channel = verify_pool.channel();
let bytes = Arc::new(AtomicUsize::new(0));
+ let chunk_count = Arc::new(AtomicUsize::new(0));
stream
.map(|info| {
let target = Arc::clone(&target);
let chunk_reader = chunk_reader.clone();
let bytes = Arc::clone(&bytes);
+ let chunk_count = Arc::clone(&chunk_count);
let verify_and_write_channel = verify_and_write_channel.clone();
Ok::<_, Error>(async move {
@@ -620,6 +637,7 @@ async fn pull_index_chunks<I: IndexFile>(
})?;
bytes.fetch_add(raw_size, Ordering::SeqCst);
+ chunk_count.fetch_add(1, Ordering::SeqCst);
Ok(())
})
@@ -632,18 +650,23 @@ async fn pull_index_chunks<I: IndexFile>(
verify_pool.complete()?;
- let elapsed = start_time.elapsed()?.as_secs_f64();
+ let elapsed = start_time.elapsed()?;
let bytes = bytes.load(Ordering::SeqCst);
+ let chunk_count = chunk_count.load(Ordering::SeqCst);
task_log!(
worker,
"downloaded {} bytes ({:.2} MiB/s)",
bytes,
- (bytes as f64) / (1024.0 * 1024.0 * elapsed)
+ (bytes as f64) / (1024.0 * 1024.0 * elapsed.as_secs_f64())
);
- Ok(())
+ Ok(PullStats {
+ chunk_count,
+ bytes,
+ elapsed,
+ })
}
fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
@@ -677,7 +700,7 @@ async fn pull_single_archive<'a>(
snapshot: &'a pbs_datastore::BackupDir,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -685,6 +708,8 @@ async fn pull_single_archive<'a>(
let mut tmp_path = path.clone();
tmp_path.set_extension("tmp");
+ let mut pull_stats = PullStats::default();
+
task_log!(worker, "sync archive {}", archive_name);
reader
@@ -704,7 +729,7 @@ async fn pull_single_archive<'a>(
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
@@ -712,6 +737,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::FixedIndex => {
@@ -724,7 +750,7 @@ async fn pull_single_archive<'a>(
if reader.skip_chunk_sync(snapshot.datastore().name()) {
task_log!(worker, "skipping chunk sync for same datastore");
} else {
- pull_index_chunks(
+ let stats = pull_index_chunks(
worker,
reader.chunk_reader(archive_info.crypt_mode),
snapshot.datastore().clone(),
@@ -732,6 +758,7 @@ async fn pull_single_archive<'a>(
downloaded_chunks,
)
.await?;
+ pull_stats.add(stats);
}
}
ArchiveType::Blob => {
@@ -743,7 +770,7 @@ async fn pull_single_archive<'a>(
if let Err(err) = std::fs::rename(&tmp_path, &path) {
bail!("Atomic rename file {:?} failed - {}", path, err);
}
- Ok(())
+ Ok(pull_stats)
}
/// Actual implementation of pulling a snapshot.
@@ -760,7 +787,8 @@ async fn pull_snapshot<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
+ let mut pull_stats = PullStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME);
@@ -776,7 +804,7 @@ async fn pull_snapshot<'a>(
{
tmp_manifest_blob = data;
} else {
- return Ok(());
+ return Ok(pull_stats);
}
if manifest_name.exists() {
@@ -800,7 +828,7 @@ async fn pull_snapshot<'a>(
};
task_log!(worker, "no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
- return Ok(()); // nothing changed
+ return Ok(pull_stats); // nothing changed
}
}
@@ -845,7 +873,7 @@ async fn pull_snapshot<'a>(
}
}
- pull_single_archive(
+ let stats = pull_single_archive(
worker,
reader.clone(),
snapshot,
@@ -853,6 +881,7 @@ async fn pull_snapshot<'a>(
downloaded_chunks.clone(),
)
.await?;
+ pull_stats.add(stats);
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -868,7 +897,7 @@ async fn pull_snapshot<'a>(
.cleanup_unreferenced_files(&manifest)
.map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -880,31 +909,36 @@ async fn pull_snapshot_from<'a>(
reader: Arc<dyn PullReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
.datastore()
.create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
- if is_new {
+ let pull_stats = if is_new {
task_log!(worker, "sync snapshot {}", snapshot.dir());
- if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
- if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
- snapshot.backup_ns(),
- snapshot.as_ref(),
- true,
- ) {
- task_log!(worker, "cleanup error - {}", cleanup_err);
+ match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
+ Err(err) => {
+ if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
+ snapshot.backup_ns(),
+ snapshot.as_ref(),
+ true,
+ ) {
+ task_log!(worker, "cleanup error - {}", cleanup_err);
+ }
+ return Err(err);
+ }
+ Ok(pull_stats) => {
+ task_log!(worker, "sync snapshot {} done", snapshot.dir());
+ pull_stats
}
- return Err(err);
}
- task_log!(worker, "sync snapshot {} done", snapshot.dir());
} else {
task_log!(worker, "re-sync snapshot {}", snapshot.dir());
- pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
- }
+ pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?
+ };
- Ok(())
+ Ok(pull_stats)
}
#[derive(PartialEq, Eq)]
@@ -1009,7 +1043,7 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1066,6 +1100,8 @@ async fn pull_group(
progress.group_snapshots = list.len() as u64;
+ let mut pull_stats = PullStats::default();
+
for (pos, from_snapshot) in list.into_iter().enumerate() {
let to_snapshot = params
.target
@@ -1082,7 +1118,8 @@ async fn pull_group(
progress.done_snapshots = pos as u64 + 1;
task_log!(worker, "percentage done: {}", progress);
- result?; // stop on error
+ let stats = result?; // stop on error
+ pull_stats.add(stats);
}
if params.remove_vanished {
@@ -1112,7 +1149,7 @@ async fn pull_group(
}
}
- Ok(())
+ Ok(pull_stats)
}
fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1233,7 +1270,7 @@ fn check_and_remove_vanished_ns(
pub(crate) async fn pull_store(
worker: &WorkerTask,
mut params: PullParameters,
-) -> Result<(), Error> {
+) -> Result<PullStats, Error> {
// explicit create shared lock to prevent GC on newly created chunks
let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
let mut errors = false;
@@ -1269,6 +1306,7 @@ pub(crate) async fn pull_store(
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
+ let mut pull_stats = PullStats::default();
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,9 +1341,11 @@ pub(crate) async fn pull_store(
}
match pull_ns(worker, &namespace, &mut params).await {
- Ok((ns_progress, ns_errors)) => {
+ Ok((ns_progress, ns_pull_stats, ns_errors)) => {
errors |= ns_errors;
+ pull_stats.add(ns_pull_stats);
+
if params.max_depth != Some(0) {
groups += ns_progress.done_groups;
snapshots += ns_progress.done_snapshots;
@@ -1338,7 +1378,7 @@ pub(crate) async fn pull_store(
bail!("sync failed with some errors.");
}
- Ok(())
+ Ok(pull_stats)
}
/// Pulls a namespace according to `params`.
@@ -1357,7 +1397,7 @@ pub(crate) async fn pull_ns(
worker: &WorkerTask,
namespace: &BackupNamespace,
params: &mut PullParameters,
-) -> Result<(StoreProgress, bool), Error> {
+) -> Result<(StoreProgress, PullStats, bool), Error> {
let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
list.sort_unstable_by(|a, b| {
@@ -1389,6 +1429,7 @@ pub(crate) async fn pull_ns(
}
let mut progress = StoreProgress::new(list.len() as u64);
+ let mut pull_stats = PullStats::default();
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
@@ -1429,10 +1470,14 @@ pub(crate) async fn pull_ns(
owner
);
errors = true; // do not stop here, instead continue
- } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
- {
- task_log!(worker, "sync group {} failed - {}", &group, err,);
- errors = true; // do not stop here, instead continue
+ } else {
+ match pull_group(worker, params, namespace, &group, &mut progress).await {
+ Ok(stats) => pull_stats.add(stats),
+ Err(err) => {
+ task_log!(worker, "sync group {} failed - {}", &group, err,);
+ errors = true; // do not stop here, instead continue
+ }
+ }
}
}
@@ -1479,5 +1524,5 @@ pub(crate) async fn pull_ns(
};
}
- Ok((progress, errors))
+ Ok((progress, pull_stats, errors))
}
--
2.39.2
next prev parent reply other threads:[~2024-03-06 14:12 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-03-06 14:11 [pbs-devel] [PATCH proxmox-backup 0/3] fix #5285: log global statistics for sync job Christian Ebner
2024-03-06 14:11 ` Christian Ebner [this message]
2024-03-06 14:11 ` [pbs-devel] [PATCH proxmox-backup 2/3] fix #5285: api: sync: add job summary to task log Christian Ebner
2024-03-06 14:11 ` [pbs-devel] [PATCH proxmox-backup 3/3] server: sync: use HumanByte for task log output Christian Ebner
2024-03-06 17:29 ` [pbs-devel] [PATCH proxmox-backup 0/3] fix #5285: log global statistics for sync job Max Carrara
2024-03-07 10:55 ` Max Carrara
2024-03-07 13:59 ` [pbs-devel] applied-series: " Thomas Lamprecht
2024-03-07 14:11 ` Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240306141153.419283-2-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.