From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 17D6E1FF166 for ; Thu, 12 Sep 2024 16:34:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 00F6E34757; Thu, 12 Sep 2024 16:34:24 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 12 Sep 2024 16:32:51 +0200 Message-Id: <20240912143322.548839-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240912143322.548839-1-c.ebner@proxmox.com> References: <20240912143322.548839-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.022 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 02/33] server: sync: move sync related stats to common module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Move and rename the `PullStats` to `SyncStats` as well as moving the `RemovedVanishedStats` to make them reusable for sync operations in push direction as well as pull direction. Signed-off-by: Christian Ebner --- changes since version 2: - no changes src/server/mod.rs | 1 + src/server/pull.rs | 121 ++++++++++++++------------------------------- src/server/sync.rs | 51 +++++++++++++++++++ 3 files changed, 89 insertions(+), 84 deletions(-) create mode 100644 src/server/sync.rs diff --git a/src/server/mod.rs b/src/server/mod.rs index 7f845e5b8..468847c2e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -34,6 +34,7 @@ pub use report::*; pub mod auth; pub(crate) mod pull; +pub(crate) mod sync; pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> { let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?; diff --git a/src/server/pull.rs b/src/server/pull.rs index de1bb5d5f..4a97bfaa3 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -5,7 +5,7 @@ use std::io::{Seek, Write}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use http::StatusCode; @@ -34,6 +34,7 @@ use pbs_datastore::{ }; use pbs_tools::sha::sha256; +use super::sync::{RemovedVanishedStats, SyncStats}; use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; use crate::tools::parallel_handler::ParallelHandler; @@ -64,54 +65,6 @@ pub(crate) struct LocalSource { ns: BackupNamespace, } -#[derive(Default)] -pub(crate) struct RemovedVanishedStats { - pub(crate) groups: usize, - pub(crate) snapshots: usize, - pub(crate) namespaces: usize, -} - -impl RemovedVanishedStats { - fn add(&mut self, rhs: RemovedVanishedStats) { - self.groups += rhs.groups; - self.snapshots += rhs.snapshots; - self.namespaces += rhs.namespaces; - } -} - -#[derive(Default)] -pub(crate) struct PullStats { - pub(crate) chunk_count: usize, - pub(crate) bytes: usize, - pub(crate) elapsed: Duration, - pub(crate) removed: Option, -} - -impl From for PullStats { - fn from(removed: RemovedVanishedStats) -> Self { - Self { - removed: Some(removed), - ..Default::default() - } - } -} - -impl PullStats { - fn add(&mut self, rhs: PullStats) { - self.chunk_count += rhs.chunk_count; - self.bytes += rhs.bytes; - self.elapsed += rhs.elapsed; - - if let Some(rhs_removed) = rhs.removed { - if let Some(ref mut removed) = self.removed { - removed.add(rhs_removed); - } else { - self.removed = Some(rhs_removed); - } - } - } -} - #[async_trait::async_trait] /// `PullSource` is a trait that provides an interface for pulling data/information from a source. /// The trait includes methods for listing namespaces, groups, and backup directories, @@ -576,7 +529,7 @@ async fn pull_index_chunks( target: Arc, index: I, downloaded_chunks: Arc>>, -) -> Result { +) -> Result { use futures::stream::{self, StreamExt, TryStreamExt}; let start_time = SystemTime::now(); @@ -663,7 +616,7 @@ async fn pull_index_chunks( HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()), ); - Ok(PullStats { + Ok(SyncStats { chunk_count, bytes, elapsed, @@ -701,7 +654,7 @@ async fn pull_single_archive<'a>( snapshot: &'a pbs_datastore::BackupDir, archive_info: &'a FileInfo, downloaded_chunks: Arc>>, -) -> Result { +) -> Result { let archive_name = &archive_info.filename; let mut path = snapshot.full_path(); path.push(archive_name); @@ -709,7 +662,7 @@ async fn pull_single_archive<'a>( let mut tmp_path = path.clone(); tmp_path.set_extension("tmp"); - let mut pull_stats = PullStats::default(); + let mut sync_stats = SyncStats::default(); info!("sync archive {archive_name}"); @@ -735,7 +688,7 @@ async fn pull_single_archive<'a>( downloaded_chunks, ) .await?; - pull_stats.add(stats); + sync_stats.add(stats); } } ArchiveType::FixedIndex => { @@ -755,7 +708,7 @@ async fn pull_single_archive<'a>( downloaded_chunks, ) .await?; - pull_stats.add(stats); + sync_stats.add(stats); } } ArchiveType::Blob => { @@ -767,7 +720,7 @@ async fn pull_single_archive<'a>( if let Err(err) = std::fs::rename(&tmp_path, &path) { bail!("Atomic rename file {:?} failed - {}", path, err); } - Ok(pull_stats) + Ok(sync_stats) } /// Actual implementation of pulling a snapshot. @@ -783,8 +736,8 @@ async fn pull_snapshot<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, -) -> Result { - let mut pull_stats = PullStats::default(); +) -> Result { + let mut sync_stats = SyncStats::default(); let mut manifest_name = snapshot.full_path(); manifest_name.push(MANIFEST_BLOB_NAME); @@ -800,7 +753,7 @@ async fn pull_snapshot<'a>( { tmp_manifest_blob = data; } else { - return Ok(pull_stats); + return Ok(sync_stats); } if manifest_name.exists() { @@ -822,7 +775,7 @@ async fn pull_snapshot<'a>( }; info!("no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); - return Ok(pull_stats); // nothing changed + return Ok(sync_stats); // nothing changed } } @@ -869,7 +822,7 @@ async fn pull_snapshot<'a>( let stats = pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?; - pull_stats.add(stats); + sync_stats.add(stats); } if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { @@ -883,7 +836,7 @@ async fn pull_snapshot<'a>( .cleanup_unreferenced_files(&manifest) .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; - Ok(pull_stats) + Ok(sync_stats) } /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. @@ -894,12 +847,12 @@ async fn pull_snapshot_from<'a>( reader: Arc, snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, -) -> Result { +) -> Result { let (_path, is_new, _snap_lock) = snapshot .datastore() .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; - let pull_stats = if is_new { + let sync_stats = if is_new { info!("sync snapshot {}", snapshot.dir()); match pull_snapshot(reader, snapshot, downloaded_chunks).await { @@ -913,9 +866,9 @@ async fn pull_snapshot_from<'a>( } return Err(err); } - Ok(pull_stats) => { + Ok(sync_stats) => { info!("sync snapshot {} done", snapshot.dir()); - pull_stats + sync_stats } } } else { @@ -923,7 +876,7 @@ async fn pull_snapshot_from<'a>( pull_snapshot(reader, snapshot, downloaded_chunks).await? }; - Ok(pull_stats) + Ok(sync_stats) } #[derive(PartialEq, Eq)] @@ -1027,7 +980,7 @@ async fn pull_group( source_namespace: &BackupNamespace, group: &BackupGroup, progress: &mut StoreProgress, -) -> Result { +) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -1084,7 +1037,7 @@ async fn pull_group( progress.group_snapshots = list.len() as u64; - let mut pull_stats = PullStats::default(); + let mut sync_stats = SyncStats::default(); for (pos, from_snapshot) in list.into_iter().enumerate() { let to_snapshot = params @@ -1102,7 +1055,7 @@ async fn pull_group( info!("percentage done: {progress}"); let stats = result?; // stop on error - pull_stats.add(stats); + sync_stats.add(stats); } if params.remove_vanished { @@ -1128,7 +1081,7 @@ async fn pull_group( .target .store .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; - pull_stats.add(PullStats::from(RemovedVanishedStats { + sync_stats.add(SyncStats::from(RemovedVanishedStats { snapshots: 1, groups: 0, namespaces: 0, @@ -1136,7 +1089,7 @@ async fn pull_group( } } - Ok(pull_stats) + Ok(sync_stats) } fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result { @@ -1253,7 +1206,7 @@ fn check_and_remove_vanished_ns( /// - remote namespaces are filtered by remote /// - creation and removal of sub-NS checked here /// - access to sub-NS checked here -pub(crate) async fn pull_store(mut params: PullParameters) -> Result { +pub(crate) async fn pull_store(mut params: PullParameters) -> Result { // explicit create shared lock to prevent GC on newly created chunks let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; let mut errors = false; @@ -1286,7 +1239,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result Result { + Ok((ns_progress, ns_sync_stats, ns_errors)) => { errors |= ns_errors; - pull_stats.add(ns_pull_stats); + sync_stats.add(ns_sync_stats); if params.max_depth != Some(0) { groups += ns_progress.done_groups; @@ -1342,14 +1295,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result Result Result<(StoreProgress, PullStats, bool), Error> { +) -> Result<(StoreProgress, SyncStats, bool), Error> { let mut list: Vec = params.source.list_groups(namespace, ¶ms.owner).await?; list.sort_unstable_by(|a, b| { @@ -1397,7 +1350,7 @@ pub(crate) async fn pull_ns( } let mut progress = StoreProgress::new(list.len() as u64); - let mut pull_stats = PullStats::default(); + let mut sync_stats = SyncStats::default(); let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; @@ -1432,7 +1385,7 @@ pub(crate) async fn pull_ns( errors = true; // do not stop here, instead continue } else { match pull_group(params, namespace, &group, &mut progress).await { - Ok(stats) => pull_stats.add(stats), + Ok(stats) => sync_stats.add(stats), Err(err) => { info!("sync group {} failed - {err}", &group); errors = true; // do not stop here, instead continue @@ -1466,13 +1419,13 @@ pub(crate) async fn pull_ns( Ok(stats) => { if !stats.all_removed() { info!("kept some protected snapshots of group '{local_group}'"); - pull_stats.add(PullStats::from(RemovedVanishedStats { + sync_stats.add(SyncStats::from(RemovedVanishedStats { snapshots: stats.removed_snapshots(), groups: 0, namespaces: 0, })); } else { - pull_stats.add(PullStats::from(RemovedVanishedStats { + sync_stats.add(SyncStats::from(RemovedVanishedStats { snapshots: stats.removed_snapshots(), groups: 1, namespaces: 0, @@ -1493,5 +1446,5 @@ pub(crate) async fn pull_ns( }; } - Ok((progress, pull_stats, errors)) + Ok((progress, sync_stats, errors)) } diff --git a/src/server/sync.rs b/src/server/sync.rs new file mode 100644 index 000000000..5f143ef63 --- /dev/null +++ b/src/server/sync.rs @@ -0,0 +1,51 @@ +//! Sync datastore contents from source to target, either in push or pull direction + +use std::time::Duration; + +#[derive(Default)] +pub(crate) struct RemovedVanishedStats { + pub(crate) groups: usize, + pub(crate) snapshots: usize, + pub(crate) namespaces: usize, +} + +impl RemovedVanishedStats { + pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) { + self.groups += rhs.groups; + self.snapshots += rhs.snapshots; + self.namespaces += rhs.namespaces; + } +} + +#[derive(Default)] +pub(crate) struct SyncStats { + pub(crate) chunk_count: usize, + pub(crate) bytes: usize, + pub(crate) elapsed: Duration, + pub(crate) removed: Option, +} + +impl From for SyncStats { + fn from(removed: RemovedVanishedStats) -> Self { + Self { + removed: Some(removed), + ..Default::default() + } + } +} + +impl SyncStats { + pub(crate) fn add(&mut self, rhs: SyncStats) { + self.chunk_count += rhs.chunk_count; + self.bytes += rhs.bytes; + self.elapsed += rhs.elapsed; + + if let Some(rhs_removed) = rhs.removed { + if let Some(ref mut removed) = self.removed { + removed.add(rhs_removed); + } else { + self.removed = Some(rhs_removed); + } + } + } +} -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel