all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module
Date: Mon, 15 Jul 2024 12:15:44 +0200	[thread overview]
Message-ID: <20240715101602.274244-7-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com>

Move and rename the `PullStats` to `SyncStats` as well as moving the
`RemovedVanishedStats` to make them reusable for sync operations in
push direction as well as pull direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/mod.rs  |   1 +
 src/server/pull.rs | 121 ++++++++++++++-------------------------------
 src/server/sync.rs |  51 +++++++++++++++++++
 3 files changed, 89 insertions(+), 84 deletions(-)
 create mode 100644 src/server/sync.rs

diff --git a/src/server/mod.rs b/src/server/mod.rs
index d2cbc931c..3acfcc1c4 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -34,6 +34,7 @@ pub use report::*;
 pub mod auth;
 
 pub(crate) mod pull;
+pub(crate) mod sync;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
     let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 823515e9a..24422ef41 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -5,7 +5,7 @@ use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
@@ -34,6 +34,7 @@ use pbs_datastore::{
 };
 use pbs_tools::sha::sha256;
 
+use super::sync::{RemovedVanishedStats, SyncStats};
 use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
@@ -64,54 +65,6 @@ pub(crate) struct LocalSource {
     ns: BackupNamespace,
 }
 
-#[derive(Default)]
-pub(crate) struct RemovedVanishedStats {
-    pub(crate) groups: usize,
-    pub(crate) snapshots: usize,
-    pub(crate) namespaces: usize,
-}
-
-impl RemovedVanishedStats {
-    fn add(&mut self, rhs: RemovedVanishedStats) {
-        self.groups += rhs.groups;
-        self.snapshots += rhs.snapshots;
-        self.namespaces += rhs.namespaces;
-    }
-}
-
-#[derive(Default)]
-pub(crate) struct PullStats {
-    pub(crate) chunk_count: usize,
-    pub(crate) bytes: usize,
-    pub(crate) elapsed: Duration,
-    pub(crate) removed: Option<RemovedVanishedStats>,
-}
-
-impl From<RemovedVanishedStats> for PullStats {
-    fn from(removed: RemovedVanishedStats) -> Self {
-        Self {
-            removed: Some(removed),
-            ..Default::default()
-        }
-    }
-}
-
-impl PullStats {
-    fn add(&mut self, rhs: PullStats) {
-        self.chunk_count += rhs.chunk_count;
-        self.bytes += rhs.bytes;
-        self.elapsed += rhs.elapsed;
-
-        if let Some(rhs_removed) = rhs.removed {
-            if let Some(ref mut removed) = self.removed {
-                removed.add(rhs_removed);
-            } else {
-                self.removed = Some(rhs_removed);
-            }
-        }
-    }
-}
-
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -569,7 +522,7 @@ async fn pull_index_chunks<I: IndexFile>(
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
     let start_time = SystemTime::now();
@@ -656,7 +609,7 @@ async fn pull_index_chunks<I: IndexFile>(
         HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
     );
 
-    Ok(PullStats {
+    Ok(SyncStats {
         chunk_count,
         bytes,
         elapsed,
@@ -694,7 +647,7 @@ async fn pull_single_archive<'a>(
     snapshot: &'a pbs_datastore::BackupDir,
     archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let archive_name = &archive_info.filename;
     let mut path = snapshot.full_path();
     path.push(archive_name);
@@ -702,7 +655,7 @@ async fn pull_single_archive<'a>(
     let mut tmp_path = path.clone();
     tmp_path.set_extension("tmp");
 
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     info!("sync archive {archive_name}");
 
@@ -728,7 +681,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
-                pull_stats.add(stats);
+                sync_stats.add(stats);
             }
         }
         ArchiveType::FixedIndex => {
@@ -748,7 +701,7 @@ async fn pull_single_archive<'a>(
                     downloaded_chunks,
                 )
                 .await?;
-                pull_stats.add(stats);
+                sync_stats.add(stats);
             }
         }
         ArchiveType::Blob => {
@@ -760,7 +713,7 @@ async fn pull_single_archive<'a>(
     if let Err(err) = std::fs::rename(&tmp_path, &path) {
         bail!("Atomic rename file {:?} failed - {}", path, err);
     }
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Actual implementation of pulling a snapshot.
@@ -776,8 +729,8 @@ async fn pull_snapshot<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
-    let mut pull_stats = PullStats::default();
+) -> Result<SyncStats, Error> {
+    let mut sync_stats = SyncStats::default();
     let mut manifest_name = snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME);
 
@@ -793,7 +746,7 @@ async fn pull_snapshot<'a>(
     {
         tmp_manifest_blob = data;
     } else {
-        return Ok(pull_stats);
+        return Ok(sync_stats);
     }
 
     if manifest_name.exists() {
@@ -815,7 +768,7 @@ async fn pull_snapshot<'a>(
             };
             info!("no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
-            return Ok(pull_stats); // nothing changed
+            return Ok(sync_stats); // nothing changed
         }
     }
 
@@ -862,7 +815,7 @@ async fn pull_snapshot<'a>(
 
         let stats =
             pull_single_archive(reader.clone(), snapshot, item, downloaded_chunks.clone()).await?;
-        pull_stats.add(stats);
+        sync_stats.add(stats);
     }
 
     if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
@@ -876,7 +829,7 @@ async fn pull_snapshot<'a>(
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
@@ -887,12 +840,12 @@ async fn pull_snapshot_from<'a>(
     reader: Arc<dyn PullReader + 'a>,
     snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let (_path, is_new, _snap_lock) = snapshot
         .datastore()
         .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
 
-    let pull_stats = if is_new {
+    let sync_stats = if is_new {
         info!("sync snapshot {}", snapshot.dir());
 
         match pull_snapshot(reader, snapshot, downloaded_chunks).await {
@@ -906,9 +859,9 @@ async fn pull_snapshot_from<'a>(
                 }
                 return Err(err);
             }
-            Ok(pull_stats) => {
+            Ok(sync_stats) => {
                 info!("sync snapshot {} done", snapshot.dir());
-                pull_stats
+                sync_stats
             }
         }
     } else {
@@ -916,7 +869,7 @@ async fn pull_snapshot_from<'a>(
         pull_snapshot(reader, snapshot, downloaded_chunks).await?
     };
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 #[derive(PartialEq, Eq)]
@@ -1020,7 +973,7 @@ async fn pull_group(
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
     progress: &mut StoreProgress,
-) -> Result<PullStats, Error> {
+) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
@@ -1077,7 +1030,7 @@ async fn pull_group(
 
     progress.group_snapshots = list.len() as u64;
 
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     for (pos, from_snapshot) in list.into_iter().enumerate() {
         let to_snapshot = params
@@ -1095,7 +1048,7 @@ async fn pull_group(
         info!("percentage done: {progress}");
 
         let stats = result?; // stop on error
-        pull_stats.add(stats);
+        sync_stats.add(stats);
     }
 
     if params.remove_vanished {
@@ -1121,7 +1074,7 @@ async fn pull_group(
                 .target
                 .store
                 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
-            pull_stats.add(PullStats::from(RemovedVanishedStats {
+            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                 snapshots: 1,
                 groups: 0,
                 namespaces: 0,
@@ -1129,7 +1082,7 @@ async fn pull_group(
         }
     }
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
@@ -1246,7 +1199,7 @@ fn check_and_remove_vanished_ns(
 /// - remote namespaces are filtered by remote
 /// - creation and removal of sub-NS checked here
 /// - access to sub-NS checked here
-pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats, Error> {
+pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats, Error> {
     // explicit create shared lock to prevent GC on newly created chunks
     let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
@@ -1279,7 +1232,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     for namespace in namespaces {
         let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1303,10 +1256,10 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
         }
 
         match pull_ns(&namespace, &mut params).await {
-            Ok((ns_progress, ns_pull_stats, ns_errors)) => {
+            Ok((ns_progress, ns_sync_stats, ns_errors)) => {
                 errors |= ns_errors;
 
-                pull_stats.add(ns_pull_stats);
+                sync_stats.add(ns_sync_stats);
 
                 if params.max_depth != Some(0) {
                     groups += ns_progress.done_groups;
@@ -1335,14 +1288,14 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     if params.remove_vanished {
         let (has_errors, stats) = check_and_remove_vanished_ns(&params, synced_ns)?;
         errors |= has_errors;
-        pull_stats.add(PullStats::from(stats));
+        sync_stats.add(SyncStats::from(stats));
     }
 
     if errors {
         bail!("sync failed with some errors.");
     }
 
-    Ok(pull_stats)
+    Ok(sync_stats)
 }
 
 /// Pulls a namespace according to `params`.
@@ -1360,7 +1313,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
 pub(crate) async fn pull_ns(
     namespace: &BackupNamespace,
     params: &mut PullParameters,
-) -> Result<(StoreProgress, PullStats, bool), Error> {
+) -> Result<(StoreProgress, SyncStats, bool), Error> {
     let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     list.sort_unstable_by(|a, b| {
@@ -1390,7 +1343,7 @@ pub(crate) async fn pull_ns(
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
-    let mut pull_stats = PullStats::default();
+    let mut sync_stats = SyncStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
@@ -1425,7 +1378,7 @@ pub(crate) async fn pull_ns(
             errors = true; // do not stop here, instead continue
         } else {
             match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
+                Ok(stats) => sync_stats.add(stats),
                 Err(err) => {
                     info!("sync group {} failed - {err}", &group);
                     errors = true; // do not stop here, instead continue
@@ -1459,13 +1412,13 @@ pub(crate) async fn pull_ns(
                     Ok(stats) => {
                         if !stats.all_removed() {
                             info!("kept some protected snapshots of group '{local_group}'");
-                            pull_stats.add(PullStats::from(RemovedVanishedStats {
+                            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 0,
                                 namespaces: 0,
                             }));
                         } else {
-                            pull_stats.add(PullStats::from(RemovedVanishedStats {
+                            sync_stats.add(SyncStats::from(RemovedVanishedStats {
                                 snapshots: stats.removed_snapshots(),
                                 groups: 1,
                                 namespaces: 0,
@@ -1486,5 +1439,5 @@ pub(crate) async fn pull_ns(
         };
     }
 
-    Ok((progress, pull_stats, errors))
+    Ok((progress, sync_stats, errors))
 }
diff --git a/src/server/sync.rs b/src/server/sync.rs
new file mode 100644
index 000000000..5f143ef63
--- /dev/null
+++ b/src/server/sync.rs
@@ -0,0 +1,51 @@
+//! Sync datastore contents from source to target, either in push or pull direction
+
+use std::time::Duration;
+
+#[derive(Default)]
+pub(crate) struct RemovedVanishedStats {
+    pub(crate) groups: usize,
+    pub(crate) snapshots: usize,
+    pub(crate) namespaces: usize,
+}
+
+impl RemovedVanishedStats {
+    pub(crate) fn add(&mut self, rhs: RemovedVanishedStats) {
+        self.groups += rhs.groups;
+        self.snapshots += rhs.snapshots;
+        self.namespaces += rhs.namespaces;
+    }
+}
+
+#[derive(Default)]
+pub(crate) struct SyncStats {
+    pub(crate) chunk_count: usize,
+    pub(crate) bytes: usize,
+    pub(crate) elapsed: Duration,
+    pub(crate) removed: Option<RemovedVanishedStats>,
+}
+
+impl From<RemovedVanishedStats> for SyncStats {
+    fn from(removed: RemovedVanishedStats) -> Self {
+        Self {
+            removed: Some(removed),
+            ..Default::default()
+        }
+    }
+}
+
+impl SyncStats {
+    pub(crate) fn add(&mut self, rhs: SyncStats) {
+        self.chunk_count += rhs.chunk_count;
+        self.bytes += rhs.bytes;
+        self.elapsed += rhs.elapsed;
+
+        if let Some(rhs_removed) = rhs.removed {
+            if let Some(ref mut removed) = self.removed {
+                removed.add(rhs_removed);
+            } else {
+                self.removed = Some(rhs_removed);
+            }
+        }
+    }
+}
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-07-15 10:16 UTC|newest]

Thread overview: 36+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
2024-07-15 10:15 ` Christian Ebner [this message]
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
2024-07-16  9:53   ` Gabriel Goller
2024-07-23  7:32     ` Christian Ebner
2024-07-30  8:38       ` Gabriel Goller
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-16 14:28   ` Christian Ebner
2024-07-16 14:51     ` Gabriel Goller
2024-07-16 14:54       ` Christian Ebner
2024-07-23 14:00   ` Christian Ebner
2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18  7:36   ` Christian Ebner
2024-07-30 10:42   ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240715101602.274244-7-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal