From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 0B4E81FF18A for ; Mon, 26 May 2025 16:15:28 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 18C8D342E5; Mon, 26 May 2025 16:15:41 +0200 (CEST) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Mon, 26 May 2025 16:14:41 +0200 Message-Id: <20250526141445.228717-9-h.laimer@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20250526141445.228717-1-h.laimer@proxmox.com> References: <20250526141445.228717-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Signed-off-by: Hannes Laimer --- src/api2/admin/datastore.rs | 27 ++++---- src/api2/backup/mod.rs | 21 +++--- src/api2/backup/upload_chunk.rs | 19 +++--- src/api2/config/datastore.rs | 5 +- src/api2/reader/environment.rs | 30 +++++---- src/api2/reader/mod.rs | 5 +- src/api2/tape/backup.rs | 11 ++-- src/api2/tape/drive.rs | 3 +- src/api2/tape/restore.rs | 71 +++++++++++---------- src/backup/hierarchy.rs | 23 +++---- src/backup/verify.rs | 53 +++++++-------- src/bin/proxmox-backup-proxy.rs | 7 +- src/server/gc_job.rs | 7 +- src/server/prune_job.rs | 5 +- src/server/pull.rs | 23 +++---- src/server/push.rs | 3 +- src/server/sync.rs | 13 ++-- src/tape/file_formats/snapshot_archive.rs | 5 +- src/tape/pool_writer/mod.rs | 11 ++-- src/tape/pool_writer/new_chunks_iterator.rs | 7 +- 20 files changed, 189 insertions(+), 160 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 39249448..e3f93cdd 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -54,6 +54,7 @@ use pbs_config::CachedUserInfo; use pbs_datastore::backup_info::BackupInfo; use pbs_datastore::cached_chunk_reader::CachedChunkReader; use pbs_datastore::catalog::{ArchiveEntry, CatalogReader}; +use pbs_datastore::chunk_store::{CanRead, Read as R}; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::data_blob_reader::DataBlobReader; use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt}; @@ -79,8 +80,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState}; const GROUP_NOTES_FILE_NAME: &str = "notes"; -fn get_group_note_path( - store: &DataStore, +fn get_group_note_path( + store: &DataStore, ns: &BackupNamespace, group: &pbs_api_types::BackupGroup, ) -> PathBuf { @@ -114,8 +115,8 @@ fn check_privs_and_load_store( Ok(datastore) } -fn read_backup_index( - backup_dir: &BackupDir, +fn read_backup_index( + backup_dir: &BackupDir, ) -> Result<(BackupManifest, Vec), Error> { let (manifest, index_size) = backup_dir.load_manifest()?; @@ -140,8 +141,8 @@ fn read_backup_index( Ok((manifest, result)) } -fn get_all_snapshot_files( - info: &BackupInfo, +fn get_all_snapshot_files( + info: &BackupInfo, ) -> Result<(BackupManifest, Vec), Error> { let (manifest, mut files) = read_backup_index(&info.backup_dir)?; @@ -529,7 +530,7 @@ unsafe fn list_snapshots_blocking( (None, None) => datastore.list_backup_groups(ns.clone())?, }; - let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| { + let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| { let backup = pbs_api_types::BackupDir { group: group.into(), time: info.backup_dir.backup_time(), @@ -629,8 +630,8 @@ unsafe fn list_snapshots_blocking( }) } -async fn get_snapshots_count( - store: &Arc, +async fn get_snapshots_count( + store: &Arc>, owner: Option<&Authid>, ) -> Result { let store = Arc::clone(store); @@ -1796,12 +1797,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new( &Permission::Anybody, ); -fn get_local_pxar_reader( - datastore: Arc, +fn get_local_pxar_reader( + datastore: Arc>, manifest: &BackupManifest, - backup_dir: &BackupDir, + backup_dir: &BackupDir, pxar_name: &BackupArchiveName, -) -> Result<(LocalDynamicReadAt, u64), Error> { +) -> Result<(LocalDynamicReadAt>, u64), Error> { let mut path = datastore.base_path(); path.push(backup_dir.relative_path()); path.push(pxar_name.as_ref()); diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs index 629df933..79354dbf 100644 --- a/src/api2/backup/mod.rs +++ b/src/api2/backup/mod.rs @@ -24,6 +24,7 @@ use pbs_api_types::{ BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP, }; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::{Read as R, Write as W}; use pbs_datastore::index::IndexFile; use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1}; use pbs_tools::json::{required_array_param, required_integer_param, required_string_param}; @@ -279,7 +280,7 @@ fn upgrade_to_backup_protocol( return Ok(()); } - let verify = |env: BackupEnvironment| { + let verify = |env: BackupEnvironment| { if let Err(err) = env.verify_after_complete(snap_guard) { env.log(format!( "backup finished, but starting the requested verify task failed: {}", @@ -400,7 +401,7 @@ fn create_dynamic_index( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let name = required_string_param(¶m, "archive-name")?.to_owned(); @@ -450,7 +451,7 @@ fn create_fixed_index( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let name = required_string_param(¶m, "archive-name")?.to_owned(); let size = required_integer_param(¶m, "size")? as usize; @@ -565,7 +566,7 @@ fn dynamic_append( ); } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.debug(format!("dynamic_append {} chunks", digest_list.len())); @@ -639,7 +640,7 @@ fn fixed_append( ); } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.debug(format!("fixed_append {} chunks", digest_list.len())); @@ -714,7 +715,7 @@ fn close_dynamic_index( let csum_str = required_string_param(¶m, "csum")?; let csum = <[u8; 32]>::from_hex(csum_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.dynamic_writer_close(wid, chunk_count, size, csum)?; @@ -767,7 +768,7 @@ fn close_fixed_index( let csum_str = required_string_param(¶m, "csum")?; let csum = <[u8; 32]>::from_hex(csum_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.fixed_writer_close(wid, chunk_count, size, csum)?; @@ -781,7 +782,7 @@ fn finish_backup( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.finish_backup()?; env.log("successfully finished backup"); @@ -800,7 +801,7 @@ fn get_previous_backup_time( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let backup_time = env .last_backup @@ -827,7 +828,7 @@ fn download_previous( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let archive_name = required_string_param(¶m, "archive-name")?.to_owned(); diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 20259660..bb1566ae 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -14,25 +14,26 @@ use proxmox_schema::*; use proxmox_sortable_macro::sortable; use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA}; +use pbs_datastore::chunk_store::{CanWrite, Lookup as L, Write as W}; use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader}; use pbs_datastore::{DataBlob, DataStore}; use pbs_tools::json::{required_integer_param, required_string_param}; use super::environment::*; -pub struct UploadChunk { +pub struct UploadChunk { stream: Body, - store: Arc, + store: Arc>, digest: [u8; 32], size: u32, encoded_size: u32, raw_data: Option>, } -impl UploadChunk { +impl UploadChunk { pub fn new( stream: Body, - store: Arc, + store: Arc>, digest: [u8; 32], size: u32, encoded_size: u32, @@ -48,7 +49,7 @@ impl UploadChunk { } } -impl Future for UploadChunk { +impl Future for UploadChunk { type Output = Result<([u8; 32], u32, u32, bool), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -159,7 +160,7 @@ fn upload_fixed_chunk( let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; @@ -228,7 +229,7 @@ fn upload_dynamic_chunk( let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; @@ -273,7 +274,7 @@ fn upload_speedtest( println!("Upload error: {}", err); } } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); Ok(env.format_response(Ok(Value::Null))) } .boxed() @@ -312,7 +313,7 @@ fn upload_blob( let file_name = required_string_param(¶m, "file-name")?.to_owned(); let encoded_size = required_integer_param(¶m, "encoded-size")? as usize; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); if !file_name.ends_with(".blob") { bail!("wrong blob file extension: '{}'", file_name); diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs index b133be70..52fa6db1 100644 --- a/src/api2/config/datastore.rs +++ b/src/api2/config/datastore.rs @@ -30,6 +30,7 @@ use crate::api2::config::tape_backup_job::{delete_tape_backup_job, list_tape_bac use crate::api2::config::verify::delete_verification_job; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::{Read as R, Write as W}; use pbs_datastore::get_datastore_mount_status; use proxmox_rest_server::WorkerTask; @@ -124,7 +125,7 @@ pub(crate) fn do_create_datastore( }; let chunk_store = if reuse_datastore { - ChunkStore::verify_chunkstore(&path).and_then(|_| { + ChunkStore::::verify_chunkstore(&path).and_then(|_| { // Must be the only instance accessing and locking the chunk store, // dropping will close all other locks from this process on the lockfile as well. ChunkStore::open( @@ -666,7 +667,7 @@ pub async fn delete_datastore( auth_id.to_string(), to_stdout, move |_worker| { - pbs_datastore::DataStore::destroy(&name, destroy_data)?; + pbs_datastore::DataStore::::destroy(&name, destroy_data)?; // ignore errors let _ = jobstate::remove_state_file("prune", &name); diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs index 3b2f06f4..26f5bec6 100644 --- a/src/api2/reader/environment.rs +++ b/src/api2/reader/environment.rs @@ -14,25 +14,25 @@ use tracing::info; /// `RpcEnvironment` implementation for backup reader service #[derive(Clone)] -pub struct ReaderEnvironment { +pub struct ReaderEnvironment { env_type: RpcEnvironmentType, result_attributes: Value, auth_id: Authid, pub debug: bool, pub formatter: &'static dyn OutputFormatter, pub worker: Arc, - pub datastore: Arc, - pub backup_dir: BackupDir, + pub datastore: Arc>, + pub backup_dir: BackupDir, allowed_chunks: Arc>>, } -impl ReaderEnvironment { +impl ReaderEnvironment { pub fn new( env_type: RpcEnvironmentType, auth_id: Authid, worker: Arc, - datastore: Arc, - backup_dir: BackupDir, + datastore: Arc>, + backup_dir: BackupDir, ) -> Self { Self { result_attributes: json!({}), @@ -71,7 +71,7 @@ impl ReaderEnvironment { } } -impl RpcEnvironment for ReaderEnvironment { +impl RpcEnvironment for ReaderEnvironment { fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } @@ -93,14 +93,18 @@ impl RpcEnvironment for ReaderEnvironment { } } -impl AsRef for dyn RpcEnvironment { - fn as_ref(&self) -> &ReaderEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for dyn RpcEnvironment { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } -impl AsRef for Box { - fn as_ref(&self) -> &ReaderEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for Box { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs index cc791299..52f0953a 100644 --- a/src/api2/reader/mod.rs +++ b/src/api2/reader/mod.rs @@ -23,6 +23,7 @@ use pbs_api_types::{ DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, }; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::Read as R; use pbs_datastore::index::IndexFile; use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}; use pbs_tools::json::required_string_param; @@ -247,7 +248,7 @@ fn download_file( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &ReaderEnvironment = rpcenv.as_ref(); + let env: &ReaderEnvironment = rpcenv.as_ref(); let file_name = required_string_param(¶m, "file-name")?.to_owned(); @@ -303,7 +304,7 @@ fn download_chunk( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &ReaderEnvironment = rpcenv.as_ref(); + let env: &ReaderEnvironment = rpcenv.as_ref(); let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs index 31293a9a..306d5936 100644 --- a/src/api2/tape/backup.rs +++ b/src/api2/tape/backup.rs @@ -18,6 +18,7 @@ use pbs_api_types::{ use pbs_config::CachedUserInfo; use pbs_datastore::backup_info::{BackupDir, BackupInfo}; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::{DataStore, StoreProgress}; use crate::tape::TapeNotificationMode; @@ -360,9 +361,9 @@ enum SnapshotBackupResult { Ignored, } -fn backup_worker( +fn backup_worker( worker: &WorkerTask, - datastore: Arc, + datastore: Arc>, pool_config: &MediaPoolConfig, setup: &TapeBackupJobSetup, summary: &mut TapeBackupJobSummary, @@ -564,11 +565,11 @@ fn update_media_online_status(drive: &str) -> Result, Error> { } } -fn backup_snapshot( +fn backup_snapshot( worker: &WorkerTask, pool_writer: &mut PoolWriter, - datastore: Arc, - snapshot: BackupDir, + datastore: Arc>, + snapshot: BackupDir, ) -> Result { let snapshot_path = snapshot.relative_path(); info!("backup snapshot {snapshot_path:?}"); diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs index ba9051de..47fa06dc 100644 --- a/src/api2/tape/drive.rs +++ b/src/api2/tape/drive.rs @@ -24,6 +24,7 @@ use pbs_api_types::{ use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE}; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::Write as W; use pbs_tape::{ linux_list_drives::{lookup_device_identification, lto_tape_device_list, open_lto_tape_device}, sg_tape::tape_alert_flags_critical, @@ -1342,7 +1343,7 @@ pub fn catalog_media( drive.read_label()?; // skip over labels - we already read them above let mut checked_chunks = HashMap::new(); - restore_media( + restore_media::( worker, &mut drive, &media_id, diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 2cc1baab..8f089c20 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -27,6 +27,7 @@ use pbs_api_types::{ }; use pbs_client::pxar::tools::handle_root_with_optional_format_version_prelude; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as W}; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; use pbs_datastore::index::IndexFile; @@ -120,13 +121,13 @@ impl NamespaceMap { } } -pub struct DataStoreMap { - map: HashMap>, - default: Option>, +pub struct DataStoreMap { + map: HashMap>>, + default: Option>>, ns_map: Option, } -impl TryFrom for DataStoreMap { +impl TryFrom for DataStoreMap { type Error = Error; fn try_from(value: String) -> Result { @@ -161,7 +162,7 @@ impl TryFrom for DataStoreMap { } } -impl DataStoreMap { +impl DataStoreMap { fn add_namespaces_maps(&mut self, mappings: Vec) -> Result { let count = mappings.len(); let ns_map = NamespaceMap::try_from(mappings)?; @@ -169,7 +170,10 @@ impl DataStoreMap { Ok(count > 0) } - fn used_datastores(&self) -> HashMap<&str, (Arc, Option>)> { + #[allow(clippy::type_complexity)] + fn used_datastores( + &self, + ) -> HashMap<&str, (Arc>, Option>)> { let mut map = HashMap::new(); for (source, target) in self.map.iter() { let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source)); @@ -189,18 +193,19 @@ impl DataStoreMap { .map(|mapping| mapping.get_namespaces(datastore, ns)) } - fn target_store(&self, source_datastore: &str) -> Option> { + fn target_store(&self, source_datastore: &str) -> Option>> { self.map .get(source_datastore) .or(self.default.as_ref()) .map(Arc::clone) } + #[allow(clippy::type_complexity)] fn get_targets( &self, source_datastore: &str, source_ns: &BackupNamespace, - ) -> Option<(Arc, Option>)> { + ) -> Option<(Arc>, Option>)> { self.target_store(source_datastore) .map(|store| (store, self.target_ns(source_datastore, source_ns))) } @@ -237,9 +242,9 @@ fn check_datastore_privs( Ok(()) } -fn check_and_create_namespaces( +fn check_and_create_namespaces( user_info: &CachedUserInfo, - store: &Arc, + store: &Arc>, ns: &BackupNamespace, auth_id: &Authid, owner: Option<&Authid>, @@ -449,13 +454,13 @@ pub fn restore( } #[allow(clippy::too_many_arguments)] -fn restore_full_worker( +fn restore_full_worker( worker: Arc, inventory: Inventory, media_set_uuid: Uuid, drive_config: SectionConfigData, drive_name: &str, - store_map: DataStoreMap, + store_map: DataStoreMap, restore_owner: &Authid, notification_mode: &TapeNotificationMode, auth_id: &Authid, @@ -529,8 +534,8 @@ fn restore_full_worker( } #[allow(clippy::too_many_arguments)] -fn check_snapshot_restorable( - store_map: &DataStoreMap, +fn check_snapshot_restorable( + store_map: &DataStoreMap, store: &str, snapshot: &str, ns: &BackupNamespace, @@ -618,14 +623,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator( worker: Arc, snapshots: Vec, inventory: Inventory, media_set_uuid: Uuid, drive_config: SectionConfigData, drive_name: &str, - store_map: DataStoreMap, + store_map: DataStoreMap, restore_owner: &Authid, notification_mode: &TapeNotificationMode, user_info: Arc, @@ -955,16 +960,16 @@ fn get_media_set_catalog( Ok(catalog) } -fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf { +fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf { let mut path = datastore.base_path(); path.push(".tmp"); path.push(media_set_uuid.to_string()); path } -fn snapshot_tmpdir( +fn snapshot_tmpdir( source_datastore: &str, - datastore: &DataStore, + datastore: &DataStore, snapshot: &str, media_set_uuid: &Uuid, ) -> PathBuf { @@ -974,9 +979,9 @@ fn snapshot_tmpdir( path } -fn restore_snapshots_to_tmpdir( +fn restore_snapshots_to_tmpdir( worker: Arc, - store_map: &DataStoreMap, + store_map: &DataStoreMap, file_list: &[u64], mut drive: Box, media_id: &MediaId, @@ -1083,10 +1088,10 @@ fn restore_snapshots_to_tmpdir( Ok(tmp_paths) } -fn restore_file_chunk_map( +fn restore_file_chunk_map( worker: Arc, drive: &mut Box, - store_map: &DataStoreMap, + store_map: &DataStoreMap, file_chunk_map: &mut BTreeMap>, ) -> Result<(), Error> { for (nr, chunk_map) in file_chunk_map.iter_mut() { @@ -1133,10 +1138,10 @@ fn restore_file_chunk_map( Ok(()) } -fn restore_partial_chunk_archive<'a>( +fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, reader: Box, - datastore: Arc, + datastore: Arc>, chunk_list: &mut HashSet<[u8; 32]>, ) -> Result { let mut decoder = ChunkArchiveDecoder::new(reader); @@ -1195,12 +1200,12 @@ fn restore_partial_chunk_archive<'a>( /// Request and restore complete media without using existing catalog (create catalog instead) #[allow(clippy::too_many_arguments)] -pub fn request_and_restore_media( +pub fn request_and_restore_media( worker: Arc, media_id: &MediaId, drive_config: &SectionConfigData, drive_name: &str, - store_map: &DataStoreMap, + store_map: &DataStoreMap, checked_chunks_map: &mut HashMap>, restore_owner: &Authid, notification_mode: &TapeNotificationMode, @@ -1253,11 +1258,11 @@ pub fn request_and_restore_media( /// Restore complete media content and catalog /// /// Only create the catalog if target is None. -pub fn restore_media( +pub fn restore_media( worker: Arc, drive: &mut Box, media_id: &MediaId, - target: Option<(&DataStoreMap, &Authid)>, + target: Option<(&DataStoreMap, &Authid)>, checked_chunks_map: &mut HashMap>, verbose: bool, auth_id: &Authid, @@ -1301,11 +1306,11 @@ pub fn restore_media( } #[allow(clippy::too_many_arguments)] -fn restore_archive<'a>( +fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, mut reader: Box, current_file_number: u64, - target: Option<(&DataStoreMap, &Authid)>, + target: Option<(&DataStoreMap, &Authid)>, catalog: &mut MediaCatalog, checked_chunks_map: &mut HashMap>, verbose: bool, @@ -1525,10 +1530,10 @@ fn scan_chunk_archive<'a>( Ok(Some(chunks)) } -fn restore_chunk_archive<'a>( +fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, reader: Box, - datastore: Arc, + datastore: Arc>, checked_chunks: &mut HashSet<[u8; 32]>, verbose: bool, ) -> Result>, Error> { diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs index 8dd71fcf..039e32a6 100644 --- a/src/backup/hierarchy.rs +++ b/src/backup/hierarchy.rs @@ -7,6 +7,7 @@ use pbs_api_types::{ PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ, }; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive}; /// Asserts that `privs` are fulfilled on datastore + (optional) namespace. @@ -68,8 +69,8 @@ pub fn check_ns_privs_full( ); } -pub fn can_access_any_namespace( - store: Arc, +pub fn can_access_any_namespace( + store: Arc>, auth_id: &Authid, user_info: &CachedUserInfo, ) -> bool { @@ -95,8 +96,8 @@ pub fn can_access_any_namespace( /// /// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and /// optional owner checks. -pub struct ListAccessibleBackupGroups<'a> { - store: &'a Arc, +pub struct ListAccessibleBackupGroups<'a, T> { + store: &'a Arc>, auth_id: Option<&'a Authid>, user_info: Arc, /// The priv on NS level that allows auth_id trump the owner check @@ -104,15 +105,15 @@ pub struct ListAccessibleBackupGroups<'a> { /// The priv that auth_id is required to have on NS level additionally to being owner owner_and_priv: u64, /// Contains the intertnal state, group iter and a bool flag for override_owner_priv - state: Option<(ListGroups, bool)>, - ns_iter: ListNamespacesRecursive, + state: Option<(ListGroups, bool)>, + ns_iter: ListNamespacesRecursive, } -impl<'a> ListAccessibleBackupGroups<'a> { +impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> { // TODO: builder pattern pub fn new_owned( - store: &'a Arc, + store: &'a Arc>, ns: BackupNamespace, max_depth: usize, auth_id: Option<&'a Authid>, @@ -122,7 +123,7 @@ impl<'a> ListAccessibleBackupGroups<'a> { } pub fn new_with_privs( - store: &'a Arc, + store: &'a Arc>, ns: BackupNamespace, max_depth: usize, override_owner_priv: Option, @@ -145,8 +146,8 @@ impl<'a> ListAccessibleBackupGroups<'a> { pub static NS_PRIVS_OK: u64 = PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT; -impl Iterator for ListAccessibleBackupGroups<'_> { - type Item = Result; +impl Iterator for ListAccessibleBackupGroups<'_, T> { + type Item = Result, Error>; fn next(&mut self) -> Option { loop { diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 3d2cba8a..15c2e9e4 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -15,6 +15,7 @@ use pbs_api_types::{ UPID, }; use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo}; +use pbs_datastore::chunk_store::{CanRead, CanWrite}; use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::{DataBlob, DataStore, StoreProgress}; @@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups; /// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have /// already been verified or detected as corrupt. -pub struct VerifyWorker { +pub struct VerifyWorker { worker: Arc, - datastore: Arc, + datastore: Arc>, verified_chunks: Arc>>, corrupt_chunks: Arc>>, } -impl VerifyWorker { +impl VerifyWorker { /// Creates a new VerifyWorker for a given task worker and datastore. - pub fn new(worker: Arc, datastore: Arc) -> Self { + pub fn new(worker: Arc, datastore: Arc>) -> Self { Self { worker, datastore, @@ -46,7 +47,7 @@ impl VerifyWorker { } } -fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { +fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { let blob = backup_dir.load_blob(&info.filename)?; let raw_size = blob.raw_size(); @@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> { } } -fn rename_corrupted_chunk(datastore: Arc, digest: &[u8; 32]) { +fn rename_corrupted_chunk(datastore: Arc>, digest: &[u8; 32]) { let (path, digest_str) = datastore.chunk_path(digest); let mut counter = 0; @@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc, digest: &[u8; 32]) { }; } -fn verify_index_chunks( - verify_worker: &VerifyWorker, +fn verify_index_chunks( + verify_worker: &VerifyWorker, index: Box, crypt_mode: CryptMode, ) -> Result<(), Error> { @@ -238,9 +239,9 @@ fn verify_index_chunks( Ok(()) } -fn verify_fixed_index( - verify_worker: &VerifyWorker, - backup_dir: &BackupDir, +fn verify_fixed_index( + verify_worker: &VerifyWorker, + backup_dir: &BackupDir, info: &FileInfo, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -260,9 +261,9 @@ fn verify_fixed_index( verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode()) } -fn verify_dynamic_index( - verify_worker: &VerifyWorker, - backup_dir: &BackupDir, +fn verify_dynamic_index( + verify_worker: &VerifyWorker, + backup_dir: &BackupDir, info: &FileInfo, ) -> Result<(), Error> { let mut path = backup_dir.relative_path(); @@ -291,9 +292,9 @@ fn verify_dynamic_index( /// - Ok(true) if verify is successful /// - Ok(false) if there were verification errors /// - Err(_) if task was aborted -pub fn verify_backup_dir( - verify_worker: &VerifyWorker, - backup_dir: &BackupDir, +pub fn verify_backup_dir( + verify_worker: &VerifyWorker, + backup_dir: &BackupDir, upid: UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, ) -> Result { @@ -325,9 +326,9 @@ pub fn verify_backup_dir( } /// See verify_backup_dir -pub fn verify_backup_dir_with_lock( - verify_worker: &VerifyWorker, - backup_dir: &BackupDir, +pub fn verify_backup_dir_with_lock( + verify_worker: &VerifyWorker, + backup_dir: &BackupDir, upid: UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, _snap_lock: BackupLockGuard, @@ -403,9 +404,9 @@ pub fn verify_backup_dir_with_lock( /// Returns /// - Ok((count, failed_dirs)) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_backup_group( - verify_worker: &VerifyWorker, - group: &BackupGroup, +pub fn verify_backup_group( + verify_worker: &VerifyWorker, + group: &BackupGroup, progress: &mut StoreProgress, upid: &UPID, filter: Option<&dyn Fn(&BackupManifest) -> bool>, @@ -455,8 +456,8 @@ pub fn verify_backup_group( /// Returns /// - Ok(failed_dirs) where failed_dirs had verification errors /// - Err(_) if task was aborted -pub fn verify_all_backups( - verify_worker: &VerifyWorker, +pub fn verify_all_backups( + verify_worker: &VerifyWorker, upid: &UPID, ns: BackupNamespace, max_depth: Option, @@ -504,7 +505,7 @@ pub fn verify_all_backups( .filter(|group| { !(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark") }) - .collect::>(), + .collect::>>(), Err(err) => { info!("unable to list backups: {err}"); return Ok(errors); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 1d4cf37c..bda2f17b 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -20,7 +20,8 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType}; use proxmox_sys::fs::CreateOptions; use proxmox_sys::logrotate::LogRotate; -use pbs_datastore::DataStore; +use pbs_datastore::chunk_store::Lookup as L; +use pbs_datastore::{is_garbage_collection_running, DataStore}; use proxmox_rest_server::{ cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector, @@ -265,7 +266,7 @@ async fn run() -> Result<(), Error> { // to remove references for not configured datastores command_sock.register_command("datastore-removed".to_string(), |_value| { - if let Err(err) = DataStore::remove_unused_datastores() { + if let Err(err) = DataStore::::remove_unused_datastores() { log::error!("could not refresh datastores: {err}"); } Ok(Value::Null) @@ -274,7 +275,7 @@ async fn run() -> Result<(), Error> { // clear cache entry for datastore that is in a specific maintenance mode command_sock.register_command("update-datastore-cache".to_string(), |value| { if let Some(name) = value.and_then(Value::as_str) { - if let Err(err) = DataStore::update_datastore_cache(name) { + if let Err(err) = DataStore::::update_datastore_cache(name) { log::error!("could not trigger update datastore cache: {err}"); } } diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs index 64835028..c2af6c67 100644 --- a/src/server/gc_job.rs +++ b/src/server/gc_job.rs @@ -4,15 +4,18 @@ use std::sync::Arc; use tracing::info; use pbs_api_types::Authid; +use pbs_datastore::chunk_store::CanWrite; use pbs_datastore::DataStore; use proxmox_rest_server::WorkerTask; use crate::server::{jobstate::Job, send_gc_status}; /// Runs a garbage collection job. -pub fn do_garbage_collection_job( +pub fn do_garbage_collection_job< + T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static, +>( mut job: Job, - datastore: Arc, + datastore: Arc>, auth_id: &Authid, schedule: Option, to_stdout: bool, diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs index 1c86647a..395aaee4 100644 --- a/src/server/prune_job.rs +++ b/src/server/prune_job.rs @@ -7,6 +7,7 @@ use pbs_api_types::{ print_store_and_ns, Authid, KeepOptions, Operation, PruneJobOptions, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, }; +use pbs_datastore::chunk_store::CanWrite; use pbs_datastore::prune::compute_prune_info; use pbs_datastore::DataStore; use proxmox_rest_server::WorkerTask; @@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask; use crate::backup::ListAccessibleBackupGroups; use crate::server::jobstate::Job; -pub fn prune_datastore( +pub fn prune_datastore( auth_id: Authid, prune_options: PruneJobOptions, - datastore: Arc, + datastore: Arc>, dry_run: bool, ) -> Result<(), Error> { let store = &datastore.name(); diff --git a/src/server/pull.rs b/src/server/pull.rs index b1724c14..573aa805 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -18,6 +18,7 @@ use pbs_api_types::{ }; use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::{CanWrite, Write as W}; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; @@ -34,8 +35,8 @@ use super::sync::{ use crate::backup::{check_ns_modification_privs, check_ns_privs}; use crate::tools::parallel_handler::ParallelHandler; -pub(crate) struct PullTarget { - store: Arc, +pub(crate) struct PullTarget { + store: Arc>, ns: BackupNamespace, } @@ -44,7 +45,7 @@ pub(crate) struct PullParameters { /// Where data is pulled from source: Arc, /// Where data should be pulled into - target: PullTarget, + target: PullTarget, /// Owner of synced groups (needs to match local owner of pre-existing groups) owner: Authid, /// Whether to remove groups which exist locally, but not on the remote end @@ -135,9 +136,9 @@ impl PullParameters { } } -async fn pull_index_chunks( +async fn pull_index_chunks( chunk_reader: Arc, - target: Arc, + target: Arc>, index: I, downloaded_chunks: Arc>>, ) -> Result { @@ -260,9 +261,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err /// -- Verify tmp file checksum /// - if archive is an index, pull referenced chunks /// - Rename tmp file into real path -async fn pull_single_archive<'a>( +async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>( reader: Arc, - snapshot: &'a pbs_datastore::BackupDir, + snapshot: &'a pbs_datastore::BackupDir, archive_info: &'a FileInfo, downloaded_chunks: Arc>>, ) -> Result { @@ -343,10 +344,10 @@ async fn pull_single_archive<'a>( /// -- if file already exists, verify contents /// -- if not, pull it from the remote /// - Download log if not already existing -async fn pull_snapshot<'a>( +async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>( params: &PullParameters, reader: Arc, - snapshot: &'a pbs_datastore::BackupDir, + snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, corrupt: bool, is_new: bool, @@ -482,10 +483,10 @@ async fn pull_snapshot<'a>( /// /// The `reader` is configured to read from the source backup directory, while the /// `snapshot` is pointing to the local datastore and target namespace. -async fn pull_snapshot_from<'a>( +async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>( params: &PullParameters, reader: Arc, - snapshot: &'a pbs_datastore::BackupDir, + snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, corrupt: bool, ) -> Result { diff --git a/src/server/push.rs b/src/server/push.rs index e71012ed..ff9d9358 100644 --- a/src/server/push.rs +++ b/src/server/push.rs @@ -18,6 +18,7 @@ use pbs_api_types::{ }; use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions}; use pbs_config::CachedUserInfo; +use pbs_datastore::chunk_store::Read as R; use pbs_datastore::data_blob::ChunkInfo; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; @@ -61,7 +62,7 @@ impl PushTarget { /// Parameters for a push operation pub(crate) struct PushParameters { /// Source of backups to be pushed to remote - source: Arc, + source: Arc>, /// Target for backups to be pushed to target: PushTarget, /// User used for permission checks on the source side, including potentially filtering visible diff --git a/src/server/sync.rs b/src/server/sync.rs index 09814ef0..96a73503 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -24,6 +24,7 @@ use pbs_api_types::{ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, }; use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{BackupManifest, DataStore, ListNamespacesRecursive, LocalChunkReader}; @@ -105,10 +106,10 @@ pub(crate) struct RemoteSourceReader { pub(crate) dir: BackupDir, } -pub(crate) struct LocalSourceReader { +pub(crate) struct LocalSourceReader { pub(crate) _dir_lock: Arc>, pub(crate) path: PathBuf, - pub(crate) datastore: Arc, + pub(crate) datastore: Arc>, } #[async_trait::async_trait] @@ -189,7 +190,7 @@ impl SyncSourceReader for RemoteSourceReader { } #[async_trait::async_trait] -impl SyncSourceReader for LocalSourceReader { +impl SyncSourceReader for LocalSourceReader { fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { Arc::new(LocalChunkReader::new( self.datastore.clone(), @@ -266,8 +267,8 @@ pub(crate) struct RemoteSource { pub(crate) client: HttpClient, } -pub(crate) struct LocalSource { - pub(crate) store: Arc, +pub(crate) struct LocalSource { + pub(crate) store: Arc>, pub(crate) ns: BackupNamespace, } @@ -415,7 +416,7 @@ impl SyncSource for RemoteSource { } #[async_trait::async_trait] -impl SyncSource for LocalSource { +impl SyncSource for LocalSource { async fn list_namespaces( &self, max_depth: &mut Option, diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs index 9d11c04b..7f4ef01f 100644 --- a/src/tape/file_formats/snapshot_archive.rs +++ b/src/tape/file_formats/snapshot_archive.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; use proxmox_sys::error::SysError; use proxmox_uuid::Uuid; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::SnapshotReader; use pbs_tape::{MediaContentHeader, TapeWrite, PROXMOX_TAPE_BLOCK_SIZE}; @@ -21,9 +22,9 @@ use crate::tape::file_formats::{ /// `LEOM` was detected before all data was written. The stream is /// marked inclomplete in that case and does not contain all data (The /// backup task must rewrite the whole file on the next media). -pub fn tape_write_snapshot_archive<'a>( +pub fn tape_write_snapshot_archive<'a, T: CanRead>( writer: &mut (dyn TapeWrite + 'a), - snapshot_reader: &SnapshotReader, + snapshot_reader: &SnapshotReader, ) -> Result, std::io::Error> { let backup_dir = snapshot_reader.snapshot(); let snapshot = diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs index 54084421..17c20add 100644 --- a/src/tape/pool_writer/mod.rs +++ b/src/tape/pool_writer/mod.rs @@ -15,6 +15,7 @@ use tracing::{info, warn}; use proxmox_uuid::Uuid; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::{DataStore, SnapshotReader}; use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite}; use proxmox_rest_server::WorkerTask; @@ -452,9 +453,9 @@ impl PoolWriter { /// archive is marked incomplete, and we do not use it. The caller /// should mark the media as full and try again using another /// media. - pub fn append_snapshot_archive( + pub fn append_snapshot_archive( &mut self, - snapshot_reader: &SnapshotReader, + snapshot_reader: &SnapshotReader, ) -> Result<(bool, usize), Error> { let status = match self.status { Some(ref mut status) => status, @@ -543,10 +544,10 @@ impl PoolWriter { Ok((leom, bytes_written)) } - pub fn spawn_chunk_reader_thread( + pub fn spawn_chunk_reader_thread( &self, - datastore: Arc, - snapshot_reader: Arc>, + datastore: Arc>, + snapshot_reader: Arc>>, ) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> { NewChunksIterator::spawn( datastore, diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs index e6f418df..e1a0da20 100644 --- a/src/tape/pool_writer/new_chunks_iterator.rs +++ b/src/tape/pool_writer/new_chunks_iterator.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{format_err, Error}; +use pbs_datastore::chunk_store::CanRead; use pbs_datastore::{DataBlob, DataStore, SnapshotReader}; use crate::tape::CatalogSet; @@ -21,9 +22,9 @@ impl NewChunksIterator { /// Creates the iterator, spawning a new thread /// /// Make sure to join() the returned thread handle. - pub fn spawn( - datastore: Arc, - snapshot_reader: Arc>, + pub fn spawn( + datastore: Arc>, + snapshot_reader: Arc>>, catalog_set: Arc>, read_threads: usize, ) -> Result<(std::thread::JoinHandle<()>, Self), Error> { -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel