From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block
Date: Mon, 26 May 2025 16:14:40 +0200 [thread overview]
Message-ID: <20250526141445.228717-8-h.laimer@proxmox.com> (raw)
In-Reply-To: <20250526141445.228717-1-h.laimer@proxmox.com>
... based on whether they read or write.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/api2/backup/environment.rs | 337 +++++++++++++++++----------------
1 file changed, 174 insertions(+), 163 deletions(-)
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 3d541b46..a1620fb9 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -13,6 +13,7 @@ use proxmox_sys::fs::{replace_file, CreateOptions};
use pbs_api_types::Authid;
use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::dynamic_index::DynamicIndexWriter;
use pbs_datastore::fixed_index::FixedIndexWriter;
use pbs_datastore::{DataBlob, DataStore};
@@ -54,17 +55,17 @@ impl std::ops::Add for UploadStatistic {
}
}
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
name: String,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
-struct FixedWriterState {
+struct FixedWriterState<T> {
name: String,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
size: usize,
chunk_size: u32,
chunk_count: u64,
@@ -76,18 +77,18 @@ struct FixedWriterState {
// key=digest, value=length
type KnownChunksMap = HashMap<[u8; 32], u32>;
-struct SharedBackupState {
+struct SharedBackupState<T> {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
- dynamic_writers: HashMap<usize, DynamicWriterState>,
- fixed_writers: HashMap<usize, FixedWriterState>,
+ dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+ fixed_writers: HashMap<usize, FixedWriterState<T>>,
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
}
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
@@ -105,26 +106,32 @@ impl SharedBackupState {
/// `RpcEnvironment` implementation for backup service
#[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
- pub last_backup: Option<BackupInfo>,
- state: Arc<Mutex<SharedBackupState>>,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
+ pub last_backup: Option<BackupInfo<T>>,
+ state: Arc<Mutex<SharedBackupState<T>>>,
}
-impl BackupEnvironment {
+impl<T: Send + Sync + 'static> BackupEnvironment<T> {
+ pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+ self.formatter.format_result(result, self)
+ }
+}
+
+impl<T> BackupEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
let state = SharedBackupState {
finished: false,
@@ -260,10 +267,148 @@ impl BackupEnvironment {
state.known_chunks.get(digest).copied()
}
+ fn log_upload_stat(
+ &self,
+ archive_name: &str,
+ csum: &[u8; 32],
+ uuid: &[u8; 16],
+ size: u64,
+ chunk_count: u64,
+ upload_stat: &UploadStatistic,
+ ) {
+ self.log(format!("Upload statistics for '{}'", archive_name));
+ self.log(format!("UUID: {}", hex::encode(uuid)));
+ self.log(format!("Checksum: {}", hex::encode(csum)));
+ self.log(format!("Size: {}", size));
+ self.log(format!("Chunk count: {}", chunk_count));
+
+ if size == 0 || chunk_count == 0 {
+ return;
+ }
+
+ self.log(format!(
+ "Upload size: {} ({}%)",
+ upload_stat.size,
+ (upload_stat.size * 100) / size
+ ));
+
+ // account for zero chunk, which might be uploaded but never used
+ let client_side_duplicates = if chunk_count < upload_stat.count {
+ 0
+ } else {
+ chunk_count - upload_stat.count
+ };
+
+ let server_side_duplicates = upload_stat.duplicates;
+
+ if (client_side_duplicates + server_side_duplicates) > 0 {
+ let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
+ self.log(format!(
+ "Duplicates: {}+{} ({}%)",
+ client_side_duplicates, server_side_duplicates, per
+ ));
+ }
+
+ if upload_stat.size > 0 {
+ self.log(format!(
+ "Compression: {}%",
+ (upload_stat.compressed_size * 100) / upload_stat.size
+ ));
+ }
+ }
+
+ pub fn log<S: AsRef<str>>(&self, msg: S) {
+ info!("{}", msg.as_ref());
+ }
+
+ pub fn debug<S: AsRef<str>>(&self, msg: S) {
+ if self.debug {
+ // This is kinda weird, we would like to use tracing::debug! here and automatically
+ // filter it, but self.debug is set from the client-side and the logs are printed on
+ // client and server side. This means that if the client sets the log level to debug,
+ // both server and client need to have 'debug' logs printed.
+ self.log(msg);
+ }
+ }
+
+ /// Raise error if finished flag is not set
+ pub fn ensure_finished(&self) -> Result<(), Error> {
+ let state = self.state.lock().unwrap();
+ if !state.finished {
+ bail!("backup ended but finished flag is not set.");
+ }
+ Ok(())
+ }
+
+ /// Return true if the finished flag is set
+ pub fn finished(&self) -> bool {
+ let state = self.state.lock().unwrap();
+ state.finished
+ }
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+ /// If verify-new is set on the datastore, this will run a new verify task
+ /// for the backup. If not, this will return and also drop the passed lock
+ /// immediately.
+ pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
+ self.ensure_finished()?;
+
+ if !self.datastore.verify_new() {
+ // no verify requested, do nothing
+ return Ok(());
+ }
+
+ // Downgrade to shared lock, the backup itself is finished
+ drop(excl_snap_lock);
+ let snap_lock = self.backup_dir.lock_shared().with_context(|| {
+ format!(
+ "while trying to verify snapshot '{:?}' after completion",
+ self.backup_dir
+ )
+ })?;
+ let worker_id = format!(
+ "{}:{}/{}/{:08X}",
+ self.datastore.name(),
+ self.backup_dir.backup_type(),
+ self.backup_dir.backup_id(),
+ self.backup_dir.backup_time()
+ );
+
+ let datastore = self.datastore.clone();
+ let backup_dir = self.backup_dir.clone();
+
+ WorkerTask::new_thread(
+ "verify",
+ Some(worker_id),
+ self.auth_id.to_string(),
+ false,
+ move |worker| {
+ worker.log_message("Automatically verifying newly added snapshot");
+
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ if !verify_backup_dir_with_lock(
+ &verify_worker,
+ &backup_dir,
+ worker.upid().clone(),
+ None,
+ snap_lock,
+ )? {
+ bail!("verification failed - please check the log for details");
+ }
+
+ Ok(())
+ },
+ )
+ .map(|_| ())
+ }
+}
+
+impl<T: CanWrite> BackupEnvironment<T> {
/// Store the writer with an unique ID
pub fn register_dynamic_writer(
&self,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
name: String,
) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@@ -289,7 +434,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_fixed_writer(
&self,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
name: String,
size: usize,
chunk_size: u32,
@@ -379,56 +524,6 @@ impl BackupEnvironment {
Ok(())
}
- fn log_upload_stat(
- &self,
- archive_name: &str,
- csum: &[u8; 32],
- uuid: &[u8; 16],
- size: u64,
- chunk_count: u64,
- upload_stat: &UploadStatistic,
- ) {
- self.log(format!("Upload statistics for '{}'", archive_name));
- self.log(format!("UUID: {}", hex::encode(uuid)));
- self.log(format!("Checksum: {}", hex::encode(csum)));
- self.log(format!("Size: {}", size));
- self.log(format!("Chunk count: {}", chunk_count));
-
- if size == 0 || chunk_count == 0 {
- return;
- }
-
- self.log(format!(
- "Upload size: {} ({}%)",
- upload_stat.size,
- (upload_stat.size * 100) / size
- ));
-
- // account for zero chunk, which might be uploaded but never used
- let client_side_duplicates = if chunk_count < upload_stat.count {
- 0
- } else {
- chunk_count - upload_stat.count
- };
-
- let server_side_duplicates = upload_stat.duplicates;
-
- if (client_side_duplicates + server_side_duplicates) > 0 {
- let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count;
- self.log(format!(
- "Duplicates: {}+{} ({}%)",
- client_side_duplicates, server_side_duplicates, per
- ));
- }
-
- if upload_stat.size > 0 {
- self.log(format!(
- "Compression: {}%",
- (upload_stat.compressed_size * 100) / upload_stat.size
- ));
- }
- }
-
/// Close dynamic writer
pub fn dynamic_writer_close(
&self,
@@ -633,94 +728,6 @@ impl BackupEnvironment {
Ok(())
}
- /// If verify-new is set on the datastore, this will run a new verify task
- /// for the backup. If not, this will return and also drop the passed lock
- /// immediately.
- pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> {
- self.ensure_finished()?;
-
- if !self.datastore.verify_new() {
- // no verify requested, do nothing
- return Ok(());
- }
-
- // Downgrade to shared lock, the backup itself is finished
- drop(excl_snap_lock);
- let snap_lock = self.backup_dir.lock_shared().with_context(|| {
- format!(
- "while trying to verify snapshot '{:?}' after completion",
- self.backup_dir
- )
- })?;
- let worker_id = format!(
- "{}:{}/{}/{:08X}",
- self.datastore.name(),
- self.backup_dir.backup_type(),
- self.backup_dir.backup_id(),
- self.backup_dir.backup_time()
- );
-
- let datastore = self.datastore.clone();
- let backup_dir = self.backup_dir.clone();
-
- WorkerTask::new_thread(
- "verify",
- Some(worker_id),
- self.auth_id.to_string(),
- false,
- move |worker| {
- worker.log_message("Automatically verifying newly added snapshot");
-
- let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
- if !verify_backup_dir_with_lock(
- &verify_worker,
- &backup_dir,
- worker.upid().clone(),
- None,
- snap_lock,
- )? {
- bail!("verification failed - please check the log for details");
- }
-
- Ok(())
- },
- )
- .map(|_| ())
- }
-
- pub fn log<S: AsRef<str>>(&self, msg: S) {
- info!("{}", msg.as_ref());
- }
-
- pub fn debug<S: AsRef<str>>(&self, msg: S) {
- if self.debug {
- // This is kinda weird, we would like to use tracing::debug! here and automatically
- // filter it, but self.debug is set from the client-side and the logs are printed on
- // client and server side. This means that if the client sets the log level to debug,
- // both server and client need to have 'debug' logs printed.
- self.log(msg);
- }
- }
-
- pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
- self.formatter.format_result(result, self)
- }
-
- /// Raise error if finished flag is not set
- pub fn ensure_finished(&self) -> Result<(), Error> {
- let state = self.state.lock().unwrap();
- if !state.finished {
- bail!("backup ended but finished flag is not set.");
- }
- Ok(())
- }
-
- /// Return true if the finished flag is set
- pub fn finished(&self) -> bool {
- let state = self.state.lock().unwrap();
- state.finished
- }
-
/// Remove complete backup
pub fn remove_backup(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
@@ -736,7 +743,7 @@ impl BackupEnvironment {
}
}
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -758,14 +765,18 @@ impl RpcEnvironment for BackupEnvironment {
}
}
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-05-26 14:14 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-05-26 14:14 [pbs-devel] [PATCH proxmox-backup v2 00/12] introduce typestate for datastore/chunkstore Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 01/12] chunkstore: add CanRead and CanWrite trait Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 02/12] chunkstore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 03/12] datastore: add generics and new lookup functions Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 04/12] datastore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 05/12] backup_info: add generics and separate functions into impl blocks Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 06/12] pbs-datastore: " Hannes Laimer
2025-05-26 14:14 ` Hannes Laimer [this message]
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 09/12] examples/tests: " Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 10/12] api: admin: pull datastore loading out of check_privs helper Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 11/12] datastore: move `fn gc_running` out of DataStoreImpl Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 12/12] api/server: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250526141445.228717-8-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.