From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 62BCF1FF18A for ; Mon, 26 May 2025 16:14:52 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8C5C734045; Mon, 26 May 2025 16:15:02 +0200 (CEST) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Mon, 26 May 2025 16:14:40 +0200 Message-Id: <20250526141445.228717-8-h.laimer@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250526141445.228717-1-h.laimer@proxmox.com> References: <20250526141445.228717-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" ... based on whether they read or write. Signed-off-by: Hannes Laimer --- src/api2/backup/environment.rs | 337 +++++++++++++++++---------------- 1 file changed, 174 insertions(+), 163 deletions(-) diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs index 3d541b46..a1620fb9 100644 --- a/src/api2/backup/environment.rs +++ b/src/api2/backup/environment.rs @@ -13,6 +13,7 @@ use proxmox_sys::fs::{replace_file, CreateOptions}; use pbs_api_types::Authid; use pbs_datastore::backup_info::{BackupDir, BackupInfo}; +use pbs_datastore::chunk_store::CanWrite; use pbs_datastore::dynamic_index::DynamicIndexWriter; use pbs_datastore::fixed_index::FixedIndexWriter; use pbs_datastore::{DataBlob, DataStore}; @@ -54,17 +55,17 @@ impl std::ops::Add for UploadStatistic { } } -struct DynamicWriterState { +struct DynamicWriterState { name: String, - index: DynamicIndexWriter, + index: DynamicIndexWriter, offset: u64, chunk_count: u64, upload_stat: UploadStatistic, } -struct FixedWriterState { +struct FixedWriterState { name: String, - index: FixedIndexWriter, + index: FixedIndexWriter, size: usize, chunk_size: u32, chunk_count: u64, @@ -76,18 +77,18 @@ struct FixedWriterState { // key=digest, value=length type KnownChunksMap = HashMap<[u8; 32], u32>; -struct SharedBackupState { +struct SharedBackupState { finished: bool, uid_counter: usize, file_counter: usize, // successfully uploaded files - dynamic_writers: HashMap, - fixed_writers: HashMap, + dynamic_writers: HashMap>, + fixed_writers: HashMap>, known_chunks: KnownChunksMap, backup_size: u64, // sums up size of all files backup_stat: UploadStatistic, } -impl SharedBackupState { +impl SharedBackupState { // Raise error if finished flag is set fn ensure_unfinished(&self) -> Result<(), Error> { if self.finished { @@ -105,26 +106,32 @@ impl SharedBackupState { /// `RpcEnvironment` implementation for backup service #[derive(Clone)] -pub struct BackupEnvironment { +pub struct BackupEnvironment { env_type: RpcEnvironmentType, result_attributes: Value, auth_id: Authid, pub debug: bool, pub formatter: &'static dyn OutputFormatter, pub worker: Arc, - pub datastore: Arc, - pub backup_dir: BackupDir, - pub last_backup: Option, - state: Arc>, + pub datastore: Arc>, + pub backup_dir: BackupDir, + pub last_backup: Option>, + state: Arc>>, } -impl BackupEnvironment { +impl BackupEnvironment { + pub fn format_response(&self, result: Result) -> Response { + self.formatter.format_result(result, self) + } +} + +impl BackupEnvironment { pub fn new( env_type: RpcEnvironmentType, auth_id: Authid, worker: Arc, - datastore: Arc, - backup_dir: BackupDir, + datastore: Arc>, + backup_dir: BackupDir, ) -> Self { let state = SharedBackupState { finished: false, @@ -260,10 +267,148 @@ impl BackupEnvironment { state.known_chunks.get(digest).copied() } + fn log_upload_stat( + &self, + archive_name: &str, + csum: &[u8; 32], + uuid: &[u8; 16], + size: u64, + chunk_count: u64, + upload_stat: &UploadStatistic, + ) { + self.log(format!("Upload statistics for '{}'", archive_name)); + self.log(format!("UUID: {}", hex::encode(uuid))); + self.log(format!("Checksum: {}", hex::encode(csum))); + self.log(format!("Size: {}", size)); + self.log(format!("Chunk count: {}", chunk_count)); + + if size == 0 || chunk_count == 0 { + return; + } + + self.log(format!( + "Upload size: {} ({}%)", + upload_stat.size, + (upload_stat.size * 100) / size + )); + + // account for zero chunk, which might be uploaded but never used + let client_side_duplicates = if chunk_count < upload_stat.count { + 0 + } else { + chunk_count - upload_stat.count + }; + + let server_side_duplicates = upload_stat.duplicates; + + if (client_side_duplicates + server_side_duplicates) > 0 { + let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count; + self.log(format!( + "Duplicates: {}+{} ({}%)", + client_side_duplicates, server_side_duplicates, per + )); + } + + if upload_stat.size > 0 { + self.log(format!( + "Compression: {}%", + (upload_stat.compressed_size * 100) / upload_stat.size + )); + } + } + + pub fn log>(&self, msg: S) { + info!("{}", msg.as_ref()); + } + + pub fn debug>(&self, msg: S) { + if self.debug { + // This is kinda weird, we would like to use tracing::debug! here and automatically + // filter it, but self.debug is set from the client-side and the logs are printed on + // client and server side. This means that if the client sets the log level to debug, + // both server and client need to have 'debug' logs printed. + self.log(msg); + } + } + + /// Raise error if finished flag is not set + pub fn ensure_finished(&self) -> Result<(), Error> { + let state = self.state.lock().unwrap(); + if !state.finished { + bail!("backup ended but finished flag is not set."); + } + Ok(()) + } + + /// Return true if the finished flag is set + pub fn finished(&self) -> bool { + let state = self.state.lock().unwrap(); + state.finished + } +} + +impl BackupEnvironment { + /// If verify-new is set on the datastore, this will run a new verify task + /// for the backup. If not, this will return and also drop the passed lock + /// immediately. + pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> { + self.ensure_finished()?; + + if !self.datastore.verify_new() { + // no verify requested, do nothing + return Ok(()); + } + + // Downgrade to shared lock, the backup itself is finished + drop(excl_snap_lock); + let snap_lock = self.backup_dir.lock_shared().with_context(|| { + format!( + "while trying to verify snapshot '{:?}' after completion", + self.backup_dir + ) + })?; + let worker_id = format!( + "{}:{}/{}/{:08X}", + self.datastore.name(), + self.backup_dir.backup_type(), + self.backup_dir.backup_id(), + self.backup_dir.backup_time() + ); + + let datastore = self.datastore.clone(); + let backup_dir = self.backup_dir.clone(); + + WorkerTask::new_thread( + "verify", + Some(worker_id), + self.auth_id.to_string(), + false, + move |worker| { + worker.log_message("Automatically verifying newly added snapshot"); + + let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); + if !verify_backup_dir_with_lock( + &verify_worker, + &backup_dir, + worker.upid().clone(), + None, + snap_lock, + )? { + bail!("verification failed - please check the log for details"); + } + + Ok(()) + }, + ) + .map(|_| ()) + } +} + +impl BackupEnvironment { /// Store the writer with an unique ID pub fn register_dynamic_writer( &self, - index: DynamicIndexWriter, + index: DynamicIndexWriter, name: String, ) -> Result { let mut state = self.state.lock().unwrap(); @@ -289,7 +434,7 @@ impl BackupEnvironment { /// Store the writer with an unique ID pub fn register_fixed_writer( &self, - index: FixedIndexWriter, + index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, @@ -379,56 +524,6 @@ impl BackupEnvironment { Ok(()) } - fn log_upload_stat( - &self, - archive_name: &str, - csum: &[u8; 32], - uuid: &[u8; 16], - size: u64, - chunk_count: u64, - upload_stat: &UploadStatistic, - ) { - self.log(format!("Upload statistics for '{}'", archive_name)); - self.log(format!("UUID: {}", hex::encode(uuid))); - self.log(format!("Checksum: {}", hex::encode(csum))); - self.log(format!("Size: {}", size)); - self.log(format!("Chunk count: {}", chunk_count)); - - if size == 0 || chunk_count == 0 { - return; - } - - self.log(format!( - "Upload size: {} ({}%)", - upload_stat.size, - (upload_stat.size * 100) / size - )); - - // account for zero chunk, which might be uploaded but never used - let client_side_duplicates = if chunk_count < upload_stat.count { - 0 - } else { - chunk_count - upload_stat.count - }; - - let server_side_duplicates = upload_stat.duplicates; - - if (client_side_duplicates + server_side_duplicates) > 0 { - let per = (client_side_duplicates + server_side_duplicates) * 100 / chunk_count; - self.log(format!( - "Duplicates: {}+{} ({}%)", - client_side_duplicates, server_side_duplicates, per - )); - } - - if upload_stat.size > 0 { - self.log(format!( - "Compression: {}%", - (upload_stat.compressed_size * 100) / upload_stat.size - )); - } - } - /// Close dynamic writer pub fn dynamic_writer_close( &self, @@ -633,94 +728,6 @@ impl BackupEnvironment { Ok(()) } - /// If verify-new is set on the datastore, this will run a new verify task - /// for the backup. If not, this will return and also drop the passed lock - /// immediately. - pub fn verify_after_complete(&self, excl_snap_lock: BackupLockGuard) -> Result<(), Error> { - self.ensure_finished()?; - - if !self.datastore.verify_new() { - // no verify requested, do nothing - return Ok(()); - } - - // Downgrade to shared lock, the backup itself is finished - drop(excl_snap_lock); - let snap_lock = self.backup_dir.lock_shared().with_context(|| { - format!( - "while trying to verify snapshot '{:?}' after completion", - self.backup_dir - ) - })?; - let worker_id = format!( - "{}:{}/{}/{:08X}", - self.datastore.name(), - self.backup_dir.backup_type(), - self.backup_dir.backup_id(), - self.backup_dir.backup_time() - ); - - let datastore = self.datastore.clone(); - let backup_dir = self.backup_dir.clone(); - - WorkerTask::new_thread( - "verify", - Some(worker_id), - self.auth_id.to_string(), - false, - move |worker| { - worker.log_message("Automatically verifying newly added snapshot"); - - let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); - if !verify_backup_dir_with_lock( - &verify_worker, - &backup_dir, - worker.upid().clone(), - None, - snap_lock, - )? { - bail!("verification failed - please check the log for details"); - } - - Ok(()) - }, - ) - .map(|_| ()) - } - - pub fn log>(&self, msg: S) { - info!("{}", msg.as_ref()); - } - - pub fn debug>(&self, msg: S) { - if self.debug { - // This is kinda weird, we would like to use tracing::debug! here and automatically - // filter it, but self.debug is set from the client-side and the logs are printed on - // client and server side. This means that if the client sets the log level to debug, - // both server and client need to have 'debug' logs printed. - self.log(msg); - } - } - - pub fn format_response(&self, result: Result) -> Response { - self.formatter.format_result(result, self) - } - - /// Raise error if finished flag is not set - pub fn ensure_finished(&self) -> Result<(), Error> { - let state = self.state.lock().unwrap(); - if !state.finished { - bail!("backup ended but finished flag is not set."); - } - Ok(()) - } - - /// Return true if the finished flag is set - pub fn finished(&self) -> bool { - let state = self.state.lock().unwrap(); - state.finished - } - /// Remove complete backup pub fn remove_backup(&self) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); @@ -736,7 +743,7 @@ impl BackupEnvironment { } } -impl RpcEnvironment for BackupEnvironment { +impl RpcEnvironment for BackupEnvironment { fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } @@ -758,14 +765,18 @@ impl RpcEnvironment for BackupEnvironment { } } -impl AsRef for dyn RpcEnvironment { - fn as_ref(&self) -> &BackupEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for dyn RpcEnvironment { + fn as_ref(&self) -> &BackupEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } -impl AsRef for Box { - fn as_ref(&self) -> &BackupEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for Box { + fn as_ref(&self) -> &BackupEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel