all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block
Date: Mon, 26 May 2025 16:14:40 +0200	[thread overview]
Message-ID: <20250526141445.228717-8-h.laimer@proxmox.com> (raw)
In-Reply-To: <20250526141445.228717-1-h.laimer@proxmox.com>

... based on whether they read or write.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/api2/backup/environment.rs | 337 +++++++++++++++++----------------
 1 file changed, 174 insertions(+), 163 deletions(-)

diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 3d541b46..a1620fb9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -13,6 +13,7 @@ use proxmox_sys::fs::{replace_file, CreateOptions};
 
 use pbs_api_types::Authid;
 use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanWrite;
 use pbs_datastore::dynamic_index::DynamicIndexWriter;
 use pbs_datastore::fixed_index::FixedIndexWriter;
 use pbs_datastore::{DataBlob, DataStore};
@@ -54,17 +55,17 @@ impl std::ops::Add for UploadStatistic {
     }
 }
 
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
     name: String,
-    index: DynamicIndexWriter,
+    index: DynamicIndexWriter<T>,
     offset: u64,
     chunk_count: u64,
     upload_stat: UploadStatistic,
 }
 
-struct FixedWriterState {
+struct FixedWriterState<T> {
     name: String,
-    index: FixedIndexWriter,
+    index: FixedIndexWriter<T>,
     size: usize,
     chunk_size: u32,
     chunk_count: u64,
@@ -76,18 +77,18 @@ struct FixedWriterState {
 // key=digest, value=length
 type KnownChunksMap = HashMap<[u8; 32], u32>;
 
-struct SharedBackupState {
+struct SharedBackupState<T> {
     finished: bool,
     uid_counter: usize,
     file_counter: usize, // successfully uploaded files
-    dynamic_writers: HashMap<usize, DynamicWriterState>,
-    fixed_writers: HashMap<usize, FixedWriterState>,
+    dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+    fixed_writers: HashMap<usize, FixedWriterState<T>>,
     known_chunks: KnownChunksMap,
     backup_size: u64, // sums up size of all files
     backup_stat: UploadStatistic,
 }
 
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
     // Raise error if finished flag is set
     fn ensure_unfinished(&self) -> Result<(), Error> {
         if self.finished {
@@ -105,26 +106,32 @@ impl SharedBackupState {
 
 /// `RpcEnvironment` implementation for backup service
 #[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
     env_type: RpcEnvironmentType,
     result_attributes: Value,
     auth_id: Authid,
     pub debug: bool,
     pub formatter: &'static dyn OutputFormatter,
     pub worker: Arc<WorkerTask>,
-    pub datastore: Arc<DataStore>,
-    pub backup_dir: BackupDir,
-    pub last_backup: Option<BackupInfo>,
-    state: Arc<Mutex<SharedBackupState>>,
+    pub datastore: Arc<DataStore<T>>,
+    pub backup_dir: BackupDir<T>,
+    pub last_backup: Option<BackupInfo<T>>,
+    state: Arc<Mutex<SharedBackupState<T>>>,
 }
 
-impl BackupEnvironment {
+impl<T: Send + Sync + 'static> BackupEnvironment<T> {
+    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+        self.formatter.format_result(result, self)
+    }
+}
+
+impl<T> BackupEnvironment<T> {
     pub fn new(
         env_type: RpcEnvironmentType,
         auth_id: Authid,
         worker: Arc<WorkerTask>,
-        datastore: Arc<DataStore>,
-        backup_dir: BackupDir,
+        datastore: Arc<DataStore<T>>,
+        backup_dir: BackupDir<T>,
     ) -> Self {
         let state = SharedBackupState {
             finished: false,
@@ -260,10 +267,148 @@ impl BackupEnvironment {
         state.known_chunks.get(digest).copied()
     }
 
+    fn log_upload_stat(
+        &self,
+        archive_name: &str,
+        csum: &[u8; 32],
+        uuid: &[u8; 16],
+        size: u64,
+        chunk_count: u64,
+        upload_stat: &UploadStatistic,
+    ) {
+        self.log(format!("Upload statistics for '{}'", archive_name));
+        self.log(format!("UUID: {}", hex::encode(uuid)));
+        self.log(format!("Checksum: {}", hex::encode(csum)));
+        self.log(format!("Size: {}", size));
+        self.log(format!("Chunk count: {}", chunk_count));
+
+        if size == 0 || chunk_count == 0 {
+            return;
+        }
+
+        self.log(format!(
+            "Upload size: {} ({}%)",
+            upload_stat.size,
+            (upload_stat.size * 100) / size
+        ));
+
+        // account for zero chunk, which might be uploaded but never used
+        let client_side_duplicates = if chunk_count < upload_stat.count {
+            0
+        } else {
+            chunk_count - upload_stat.count
+        };
+
+        let server_side_duplicates = upload_stat.duplicates;
+
+        if (client_side_duplicates + server_side_duplicates) > 0 {
+            let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
+            self.log(format!(
+                "Duplicates: {}+{} ({}%)",
+                client_side_duplicates, server_side_duplicates, per
+            ));
+        }
+
+        if upload_stat.size > 0 {
+            self.log(format!(
+                "Compression: {}%",
+                (upload_stat.compressed_size * 100) / upload_stat.size
+            ));
+        }
+    }
+
+    pub fn log<S: AsRef<str>>(&self, msg: S) {
+        info!("{}", msg.as_ref());
+    }
+
+    pub fn debug<S: AsRef<str>>(&self, msg: S) {
+        if self.debug {
+            // This is kinda weird, we would like to use tracing::debug! here and automatically
+            // filter it, but self.debug is set from the client-side and the logs are printed on
+            // client and server side. This means that if the client sets the log level to debug,
+            // both server and client need to have 'debug' logs printed.
+            self.log(msg);
+        }
+    }
+
+    /// Raise error if finished flag is not set
+    pub fn ensure_finished(&self) -> Result<(), Error> {
+        let state = self.state.lock().unwrap();
+        if !state.finished {
+            bail!("backup ended but finished flag is not set.");
+        }
+        Ok(())
+    }
+
+    /// Return true if the finished flag is set
+    pub fn finished(&self) -> bool {
+        let state = self.state.lock().unwrap();
+        state.finished
+    }
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+    /// If verify-new is set on the datastore, this will run a new verify task
+    /// for the backup. If not, this will return and also drop the passed lock
+    /// immediately.
+    pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
+        self.ensure_finished()?;
+
+        if !self.datastore.verify_new() {
+            // no verify requested, do nothing
+            return Ok(());
+        }
+
+        // Downgrade to shared lock, the backup itself is finished
+        drop(excl_snap_lock);
+        let snap_lock = self.backup_dir.lock_shared().with_context(|| {
+            format!(
+                "while trying to verify snapshot '{:?}' after completion",
+                self.backup_dir
+            )
+        })?;
+        let worker_id = format!(
+            "{}:{}/{}/{:08X}",
+            self.datastore.name(),
+            self.backup_dir.backup_type(),
+            self.backup_dir.backup_id(),
+            self.backup_dir.backup_time()
+        );
+
+        let datastore = self.datastore.clone();
+        let backup_dir = self.backup_dir.clone();
+
+        WorkerTask::new_thread(
+            "verify",
+            Some(worker_id),
+            self.auth_id.to_string(),
+            false,
+            move |worker| {
+                worker.log_message("Automatically verifying newly added snapshot");
+
+                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+                if !verify_backup_dir_with_lock(
+                    &verify_worker,
+                    &backup_dir,
+                    worker.upid().clone(),
+                    None,
+                    snap_lock,
+                )? {
+                    bail!("verification failed - please check the log for details");
+                }
+
+                Ok(())
+            },
+        )
+        .map(|_| ())
+    }
+}
+
+impl<T: CanWrite> BackupEnvironment<T> {
     /// Store the writer with an unique ID
     pub fn register_dynamic_writer(
         &self,
-        index: DynamicIndexWriter,
+        index: DynamicIndexWriter<T>,
         name: String,
     ) -> Result<usize, Error> {
         let mut state = self.state.lock().unwrap();
@@ -289,7 +434,7 @@ impl BackupEnvironment {
     /// Store the writer with an unique ID
     pub fn register_fixed_writer(
         &self,
-        index: FixedIndexWriter,
+        index: FixedIndexWriter<T>,
         name: String,
         size: usize,
         chunk_size: u32,
@@ -379,56 +524,6 @@ impl BackupEnvironment {
         Ok(())
     }
 
-    fn log_upload_stat(
-        &self,
-        archive_name: &str,
-        csum: &[u8; 32],
-        uuid: &[u8; 16],
-        size: u64,
-        chunk_count: u64,
-        upload_stat: &UploadStatistic,
-    ) {
-        self.log(format!("Upload statistics for '{}'", archive_name));
-        self.log(format!("UUID: {}", hex::encode(uuid)));
-        self.log(format!("Checksum: {}", hex::encode(csum)));
-        self.log(format!("Size: {}", size));
-        self.log(format!("Chunk count: {}", chunk_count));
-
-        if size == 0 || chunk_count == 0 {
-            return;
-        }
-
-        self.log(format!(
-            "Upload size: {} ({}%)",
-            upload_stat.size,
-            (upload_stat.size * 100) / size
-        ));
-
-        // account for zero chunk, which might be uploaded but never used
-        let client_side_duplicates = if chunk_count < upload_stat.count {
-            0
-        } else {
-            chunk_count - upload_stat.count
-        };
-
-        let server_side_duplicates = upload_stat.duplicates;
-
-        if (client_side_duplicates + server_side_duplicates) > 0 {
-            let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
-            self.log(format!(
-                "Duplicates: {}+{} ({}%)",
-                client_side_duplicates, server_side_duplicates, per
-            ));
-        }
-
-        if upload_stat.size > 0 {
-            self.log(format!(
-                "Compression: {}%",
-                (upload_stat.compressed_size * 100) / upload_stat.size
-            ));
-        }
-    }
-
     /// Close dynamic writer
     pub fn dynamic_writer_close(
         &self,
@@ -633,94 +728,6 @@ impl BackupEnvironment {
         Ok(())
     }
 
-    /// If verify-new is set on the datastore, this will run a new verify task
-    /// for the backup. If not, this will return and also drop the passed lock
-    /// immediately.
-    pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
-        self.ensure_finished()?;
-
-        if !self.datastore.verify_new() {
-            // no verify requested, do nothing
-            return Ok(());
-        }
-
-        // Downgrade to shared lock, the backup itself is finished
-        drop(excl_snap_lock);
-        let snap_lock = self.backup_dir.lock_shared().with_context(|| {
-            format!(
-                "while trying to verify snapshot '{:?}' after completion",
-                self.backup_dir
-            )
-        })?;
-        let worker_id = format!(
-            "{}:{}/{}/{:08X}",
-            self.datastore.name(),
-            self.backup_dir.backup_type(),
-            self.backup_dir.backup_id(),
-            self.backup_dir.backup_time()
-        );
-
-        let datastore = self.datastore.clone();
-        let backup_dir = self.backup_dir.clone();
-
-        WorkerTask::new_thread(
-            "verify",
-            Some(worker_id),
-            self.auth_id.to_string(),
-            false,
-            move |worker| {
-                worker.log_message("Automatically verifying newly added snapshot");
-
-                let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
-                if !verify_backup_dir_with_lock(
-                    &verify_worker,
-                    &backup_dir,
-                    worker.upid().clone(),
-                    None,
-                    snap_lock,
-                )? {
-                    bail!("verification failed - please check the log for details");
-                }
-
-                Ok(())
-            },
-        )
-        .map(|_| ())
-    }
-
-    pub fn log<S: AsRef<str>>(&self, msg: S) {
-        info!("{}", msg.as_ref());
-    }
-
-    pub fn debug<S: AsRef<str>>(&self, msg: S) {
-        if self.debug {
-            // This is kinda weird, we would like to use tracing::debug! here and automatically
-            // filter it, but self.debug is set from the client-side and the logs are printed on
-            // client and server side. This means that if the client sets the log level to debug,
-            // both server and client need to have 'debug' logs printed.
-            self.log(msg);
-        }
-    }
-
-    pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
-        self.formatter.format_result(result, self)
-    }
-
-    /// Raise error if finished flag is not set
-    pub fn ensure_finished(&self) -> Result<(), Error> {
-        let state = self.state.lock().unwrap();
-        if !state.finished {
-            bail!("backup ended but finished flag is not set.");
-        }
-        Ok(())
-    }
-
-    /// Return true if the finished flag is set
-    pub fn finished(&self) -> bool {
-        let state = self.state.lock().unwrap();
-        state.finished
-    }
-
     /// Remove complete backup
     pub fn remove_backup(&self) -> Result<(), Error> {
         let mut state = self.state.lock().unwrap();
@@ -736,7 +743,7 @@ impl BackupEnvironment {
     }
 }
 
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
     fn result_attrib_mut(&mut self) -> &mut Value {
         &mut self.result_attributes
     }
@@ -758,14 +765,18 @@ impl RpcEnvironment for BackupEnvironment {
     }
 }
 
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
 
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
-    fn as_ref(&self) -> &BackupEnvironment {
-        self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+    fn as_ref(&self) -> &BackupEnvironment<T> {
+        self.as_any()
+            .downcast_ref::<BackupEnvironment<T>>()
+            .unwrap()
     }
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-05-26 14:14 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-05-26 14:14 [pbs-devel] [PATCH proxmox-backup v2 00/12] introduce typestate for datastore/chunkstore Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 01/12] chunkstore: add CanRead and CanWrite trait Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 02/12] chunkstore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 03/12] datastore: add generics and new lookup functions Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 04/12] datastore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 05/12] backup_info: add generics and separate functions into impl blocks Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 06/12] pbs-datastore: " Hannes Laimer
2025-05-26 14:14 ` Hannes Laimer [this message]
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 09/12] examples/tests: " Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 10/12] api: admin: pull datastore loading out of check_privs helper Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 11/12] datastore: move `fn gc_running` out of DataStoreImpl Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 12/12] api/server: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250526141445.228717-8-h.laimer@proxmox.com \
    --to=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal