From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1A1F897EBD for ; Wed, 6 Mar 2024 15:12:39 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id F129418DA1 for ; Wed, 6 Mar 2024 15:12:08 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 6 Mar 2024 15:12:07 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B673D48833 for ; Wed, 6 Mar 2024 15:12:07 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 6 Mar 2024 15:11:51 +0100 Message-Id: <20240306141153.419283-2-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240306141153.419283-1-c.ebner@proxmox.com> References: <20240306141153.419283-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.040 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH proxmox-backup 1/3] server: sync: return `PullStats` for pull related methods X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 06 Mar 2024 14:12:39 -0000 Return basic statistics on pull related methods via `PullStats` objects, in order to construct a global summary for sync jobs. Signed-off-by: Christian Ebner --- src/server/pull.rs | 125 ++++++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 40 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 5a4ba806..7d745c77 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -5,7 +5,7 @@ use std::io::{Seek, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Error}; use http::StatusCode; @@ -64,6 +64,21 @@ pub(crate) struct LocalSource { ns: BackupNamespace, } +#[derive(Default)] +pub(crate) struct PullStats { + pub(crate) chunk_count: usize, + pub(crate) bytes: usize, + pub(crate) elapsed: Duration, +} + +impl PullStats { + fn add(&mut self, rhs: PullStats) { + self.chunk_count += rhs.chunk_count; + self.bytes += rhs.bytes; + self.elapsed += rhs.elapsed; + } +} + #[async_trait::async_trait] /// `PullSource` is a trait that provides an interface for pulling data/information from a source. /// The trait includes methods for listing namespaces, groups, and backup directories, @@ -559,7 +574,7 @@ async fn pull_index_chunks( target: Arc, index: I, downloaded_chunks: Arc>>, -) -> Result<(), Error> { +) -> Result { use futures::stream::{self, StreamExt, TryStreamExt}; let start_time = SystemTime::now(); @@ -594,12 +609,14 @@ async fn pull_index_chunks( let verify_and_write_channel = verify_pool.channel(); let bytes = Arc::new(AtomicUsize::new(0)); + let chunk_count = Arc::new(AtomicUsize::new(0)); stream .map(|info| { let target = Arc::clone(&target); let chunk_reader = chunk_reader.clone(); let bytes = Arc::clone(&bytes); + let chunk_count = Arc::clone(&chunk_count); let verify_and_write_channel = verify_and_write_channel.clone(); Ok::<_, Error>(async move { @@ -620,6 +637,7 @@ async fn pull_index_chunks( })?; bytes.fetch_add(raw_size, Ordering::SeqCst); + chunk_count.fetch_add(1, Ordering::SeqCst); Ok(()) }) @@ -632,18 +650,23 @@ async fn pull_index_chunks( verify_pool.complete()?; - let elapsed = start_time.elapsed()?.as_secs_f64(); + let elapsed = start_time.elapsed()?; let bytes = bytes.load(Ordering::SeqCst); + let chunk_count = chunk_count.load(Ordering::SeqCst); task_log!( worker, "downloaded {} bytes ({:.2} MiB/s)", bytes, - (bytes as f64) / (1024.0 * 1024.0 * elapsed) + (bytes as f64) / (1024.0 * 1024.0 * elapsed.as_secs_f64()) ); - Ok(()) + Ok(PullStats { + chunk_count, + bytes, + elapsed, + }) } fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { @@ -677,7 +700,7 @@ async fn pull_single_archive<'a>( snapshot: &'a pbs_datastore::BackupDir, archive_info: &'a FileInfo, downloaded_chunks: Arc>>, -) -> Result<(), Error> { +) -> Result { let archive_name = &archive_info.filename; let mut path = snapshot.full_path(); path.push(archive_name); @@ -685,6 +708,8 @@ async fn pull_single_archive<'a>( let mut tmp_path = path.clone(); tmp_path.set_extension("tmp"); + let mut pull_stats = PullStats::default(); + task_log!(worker, "sync archive {}", archive_name); reader @@ -704,7 +729,7 @@ async fn pull_single_archive<'a>( if reader.skip_chunk_sync(snapshot.datastore().name()) { task_log!(worker, "skipping chunk sync for same datastore"); } else { - pull_index_chunks( + let stats = pull_index_chunks( worker, reader.chunk_reader(archive_info.crypt_mode), snapshot.datastore().clone(), @@ -712,6 +737,7 @@ async fn pull_single_archive<'a>( downloaded_chunks, ) .await?; + pull_stats.add(stats); } } ArchiveType::FixedIndex => { @@ -724,7 +750,7 @@ async fn pull_single_archive<'a>( if reader.skip_chunk_sync(snapshot.datastore().name()) { task_log!(worker, "skipping chunk sync for same datastore"); } else { - pull_index_chunks( + let stats = pull_index_chunks( worker, reader.chunk_reader(archive_info.crypt_mode), snapshot.datastore().clone(), @@ -732,6 +758,7 @@ async fn pull_single_archive<'a>( downloaded_chunks, ) .await?; + pull_stats.add(stats); } } ArchiveType::Blob => { @@ -743,7 +770,7 @@ async fn pull_single_archive<'a>( if let Err(err) = std::fs::rename(&tmp_path, &path) { bail!("Atomic rename file {:?} failed - {}", path, err); } - Ok(()) + Ok(pull_stats) } /// Actual implementation of pulling a snapshot. @@ -760,7 +787,8 @@ async fn pull_snapshot<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, -) -> Result<(), Error> { +) -> Result { + let mut pull_stats = PullStats::default(); let mut manifest_name = snapshot.full_path(); manifest_name.push(MANIFEST_BLOB_NAME); @@ -776,7 +804,7 @@ async fn pull_snapshot<'a>( { tmp_manifest_blob = data; } else { - return Ok(()); + return Ok(pull_stats); } if manifest_name.exists() { @@ -800,7 +828,7 @@ async fn pull_snapshot<'a>( }; task_log!(worker, "no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); - return Ok(()); // nothing changed + return Ok(pull_stats); // nothing changed } } @@ -845,7 +873,7 @@ async fn pull_snapshot<'a>( } } - pull_single_archive( + let stats = pull_single_archive( worker, reader.clone(), snapshot, @@ -853,6 +881,7 @@ async fn pull_snapshot<'a>( downloaded_chunks.clone(), ) .await?; + pull_stats.add(stats); } if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { @@ -868,7 +897,7 @@ async fn pull_snapshot<'a>( .cleanup_unreferenced_files(&manifest) .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; - Ok(()) + Ok(pull_stats) } /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. @@ -880,31 +909,36 @@ async fn pull_snapshot_from<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, -) -> Result<(), Error> { +) -> Result { let (_path, is_new, _snap_lock) = snapshot .datastore() .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; - if is_new { + let pull_stats = if is_new { task_log!(worker, "sync snapshot {}", snapshot.dir()); - if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { - if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( - snapshot.backup_ns(), - snapshot.as_ref(), - true, - ) { - task_log!(worker, "cleanup error - {}", cleanup_err); + match pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { + Err(err) => { + if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( + snapshot.backup_ns(), + snapshot.as_ref(), + true, + ) { + task_log!(worker, "cleanup error - {}", cleanup_err); + } + return Err(err); + } + Ok(pull_stats) => { + task_log!(worker, "sync snapshot {} done", snapshot.dir()); + pull_stats } - return Err(err); } - task_log!(worker, "sync snapshot {} done", snapshot.dir()); } else { task_log!(worker, "re-sync snapshot {}", snapshot.dir()); - pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; - } + pull_snapshot(worker, reader, snapshot, downloaded_chunks).await? + }; - Ok(()) + Ok(pull_stats) } #[derive(PartialEq, Eq)] @@ -1009,7 +1043,7 @@ async fn pull_group( source_namespace: &BackupNamespace, group: &BackupGroup, progress: &mut StoreProgress, -) -> Result<(), Error> { +) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -1066,6 +1100,8 @@ async fn pull_group( progress.group_snapshots = list.len() as u64; + let mut pull_stats = PullStats::default(); + for (pos, from_snapshot) in list.into_iter().enumerate() { let to_snapshot = params .target @@ -1082,7 +1118,8 @@ async fn pull_group( progress.done_snapshots = pos as u64 + 1; task_log!(worker, "percentage done: {}", progress); - result?; // stop on error + let stats = result?; // stop on error + pull_stats.add(stats); } if params.remove_vanished { @@ -1112,7 +1149,7 @@ async fn pull_group( } } - Ok(()) + Ok(pull_stats) } fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result { @@ -1233,7 +1270,7 @@ fn check_and_remove_vanished_ns( pub(crate) async fn pull_store( worker: &WorkerTask, mut params: PullParameters, -) -> Result<(), Error> { +) -> Result { // explicit create shared lock to prevent GC on newly created chunks let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; let mut errors = false; @@ -1269,6 +1306,7 @@ pub(crate) async fn pull_store( let (mut groups, mut snapshots) = (0, 0); let mut synced_ns = HashSet::with_capacity(namespaces.len()); + let mut pull_stats = PullStats::default(); for namespace in namespaces { let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace); @@ -1303,9 +1341,11 @@ pub(crate) async fn pull_store( } match pull_ns(worker, &namespace, &mut params).await { - Ok((ns_progress, ns_errors)) => { + Ok((ns_progress, ns_pull_stats, ns_errors)) => { errors |= ns_errors; + pull_stats.add(ns_pull_stats); + if params.max_depth != Some(0) { groups += ns_progress.done_groups; snapshots += ns_progress.done_snapshots; @@ -1338,7 +1378,7 @@ pub(crate) async fn pull_store( bail!("sync failed with some errors."); } - Ok(()) + Ok(pull_stats) } /// Pulls a namespace according to `params`. @@ -1357,7 +1397,7 @@ pub(crate) async fn pull_ns( worker: &WorkerTask, namespace: &BackupNamespace, params: &mut PullParameters, -) -> Result<(StoreProgress, bool), Error> { +) -> Result<(StoreProgress, PullStats, bool), Error> { let mut list: Vec = params.source.list_groups(namespace, ¶ms.owner).await?; list.sort_unstable_by(|a, b| { @@ -1389,6 +1429,7 @@ pub(crate) async fn pull_ns( } let mut progress = StoreProgress::new(list.len() as u64); + let mut pull_stats = PullStats::default(); let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; @@ -1429,10 +1470,14 @@ pub(crate) async fn pull_ns( owner ); errors = true; // do not stop here, instead continue - } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await - { - task_log!(worker, "sync group {} failed - {}", &group, err,); - errors = true; // do not stop here, instead continue + } else { + match pull_group(worker, params, namespace, &group, &mut progress).await { + Ok(stats) => pull_stats.add(stats), + Err(err) => { + task_log!(worker, "sync group {} failed - {}", &group, err,); + errors = true; // do not stop here, instead continue + } + } } } @@ -1479,5 +1524,5 @@ pub(crate) async fn pull_ns( }; } - Ok((progress, errors)) + Ok((progress, pull_stats, errors)) } -- 2.39.2