From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D2F301FF16C for ; Tue, 3 Sep 2024 14:34:08 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CCE4096C1; Tue, 3 Sep 2024 14:34:40 +0200 (CEST) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Tue, 3 Sep 2024 14:34:00 +0200 Message-Id: <20240903123401.91513-10-h.laimer@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240903123401.91513-1-h.laimer@proxmox.com> References: <20240903123401.91513-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.018 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Signed-off-by: Hannes Laimer --- src/api2/admin/datastore.rs | 27 ++--- src/api2/backup/environment.rs | 176 +++++++++++++++++--------------- src/api2/backup/mod.rs | 21 ++-- src/api2/backup/upload_chunk.rs | 19 ++-- src/api2/reader/environment.rs | 31 +++--- src/api2/reader/mod.rs | 5 +- src/api2/tape/backup.rs | 21 ++-- src/api2/tape/drive.rs | 2 +- src/api2/tape/restore.rs | 69 +++++++------ 9 files changed, 196 insertions(+), 175 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index f1eed9fc..e1124284 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Error}; use futures::*; use hyper::http::request::Parts; use hyper::{header, Body, Response, StatusCode}; +use pbs_datastore::chunk_store::{CanRead, Read}; use serde::Deserialize; use serde_json::{json, Value}; use tokio_stream::wrappers::ReceiverStream; @@ -73,8 +74,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState}; const GROUP_NOTES_FILE_NAME: &str = "notes"; -fn get_group_note_path( - store: &DataStore, +fn get_group_note_path( + store: &DataStore, ns: &BackupNamespace, group: &pbs_api_types::BackupGroup, ) -> PathBuf { @@ -111,8 +112,8 @@ fn check_privs( Ok(()) } -fn read_backup_index( - backup_dir: &BackupDir, +fn read_backup_index( + backup_dir: &BackupDir, ) -> Result<(BackupManifest, Vec), Error> { let (manifest, index_size) = backup_dir.load_manifest()?; @@ -137,8 +138,8 @@ fn read_backup_index( Ok((manifest, result)) } -fn get_all_snapshot_files( - info: &BackupInfo, +fn get_all_snapshot_files( + info: &BackupInfo, ) -> Result<(BackupManifest, Vec), Error> { let (manifest, mut files) = read_backup_index(&info.backup_dir)?; @@ -502,7 +503,7 @@ unsafe fn list_snapshots_blocking( (None, None) => datastore.list_backup_groups(ns.clone())?, }; - let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| { + let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| { let backup = pbs_api_types::BackupDir { group: group.into(), time: info.backup_dir.backup_time(), @@ -604,8 +605,8 @@ unsafe fn list_snapshots_blocking( }) } -async fn get_snapshots_count( - store: &Arc, +async fn get_snapshots_count( + store: &Arc>, owner: Option<&Authid>, ) -> Result { let store = Arc::clone(store); @@ -1771,12 +1772,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new( &Permission::Anybody, ); -fn get_local_pxar_reader( - datastore: Arc, +fn get_local_pxar_reader( + datastore: Arc>, manifest: &BackupManifest, - backup_dir: &BackupDir, + backup_dir: &BackupDir, pxar_name: &str, -) -> Result<(LocalDynamicReadAt, u64), Error> { +) -> Result<(LocalDynamicReadAt>, u64), Error> { let mut path = datastore.base_path(); path.push(backup_dir.relative_path()); path.push(pxar_name); diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs index 99d885e2..f222da76 100644 --- a/src/api2/backup/environment.rs +++ b/src/api2/backup/environment.rs @@ -1,5 +1,6 @@ use anyhow::{bail, format_err, Error}; use nix::dir::Dir; +use pbs_datastore::chunk_store::CanWrite; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tracing::info; @@ -53,17 +54,17 @@ impl std::ops::Add for UploadStatistic { } } -struct DynamicWriterState { +struct DynamicWriterState { name: String, - index: DynamicIndexWriter, + index: DynamicIndexWriter, offset: u64, chunk_count: u64, upload_stat: UploadStatistic, } -struct FixedWriterState { +struct FixedWriterState { name: String, - index: FixedIndexWriter, + index: FixedIndexWriter, size: usize, chunk_size: u32, chunk_count: u64, @@ -75,18 +76,18 @@ struct FixedWriterState { // key=digest, value=length type KnownChunksMap = HashMap<[u8; 32], u32>; -struct SharedBackupState { +struct SharedBackupState { finished: bool, uid_counter: usize, file_counter: usize, // successfully uploaded files - dynamic_writers: HashMap, - fixed_writers: HashMap, + dynamic_writers: HashMap>, + fixed_writers: HashMap>, known_chunks: KnownChunksMap, backup_size: u64, // sums up size of all files backup_stat: UploadStatistic, } -impl SharedBackupState { +impl SharedBackupState { // Raise error if finished flag is set fn ensure_unfinished(&self) -> Result<(), Error> { if self.finished { @@ -104,26 +105,26 @@ impl SharedBackupState { /// `RpcEnvironment` implementation for backup service #[derive(Clone)] -pub struct BackupEnvironment { +pub struct BackupEnvironment { env_type: RpcEnvironmentType, result_attributes: Value, auth_id: Authid, pub debug: bool, pub formatter: &'static dyn OutputFormatter, pub worker: Arc, - pub datastore: Arc, - pub backup_dir: BackupDir, - pub last_backup: Option, - state: Arc>, + pub datastore: Arc>, + pub backup_dir: BackupDir, + pub last_backup: Option>, + state: Arc>>, } -impl BackupEnvironment { +impl BackupEnvironment { pub fn new( env_type: RpcEnvironmentType, auth_id: Authid, worker: Arc, - datastore: Arc, - backup_dir: BackupDir, + datastore: Arc>, + backup_dir: BackupDir, ) -> Self { let state = SharedBackupState { finished: false, @@ -149,7 +150,69 @@ impl BackupEnvironment { state: Arc::new(Mutex::new(state)), } } +} + +impl BackupEnvironment { + pub fn format_response(&self, result: Result) -> Response { + self.formatter.format_result(result, self) + } + + /// If verify-new is set on the datastore, this will run a new verify task + /// for the backup. If not, this will return and also drop the passed lock + /// immediately. + pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> { + self.ensure_finished()?; + + if !self.datastore.verify_new() { + // no verify requested, do nothing + return Ok(()); + } + + // Downgrade to shared lock, the backup itself is finished + drop(excl_snap_lock); + let snap_lock = lock_dir_noblock_shared( + &self.backup_dir.full_path(), + "snapshot", + "snapshot is already locked by another operation", + )?; + + let worker_id = format!( + "{}:{}/{}/{:08X}", + self.datastore.name(), + self.backup_dir.backup_type(), + self.backup_dir.backup_id(), + self.backup_dir.backup_time() + ); + + let datastore = self.datastore.clone(); + let backup_dir = self.backup_dir.clone(); + + WorkerTask::new_thread( + "verify", + Some(worker_id), + self.auth_id.to_string(), + false, + move |worker| { + worker.log_message("Automatically verifying newly added snapshot"); + + let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); + if !verify_backup_dir_with_lock( + &verify_worker, + &backup_dir, + worker.upid().clone(), + None, + snap_lock, + )? { + bail!("verification failed - please check the log for details"); + } + Ok(()) + }, + ) + .map(|_| ()) + } +} +impl BackupEnvironment { /// Register a Chunk with associated length. /// /// We do not fully trust clients, so a client may only use registered @@ -262,7 +325,7 @@ impl BackupEnvironment { /// Store the writer with an unique ID pub fn register_dynamic_writer( &self, - index: DynamicIndexWriter, + index: DynamicIndexWriter, name: String, ) -> Result { let mut state = self.state.lock().unwrap(); @@ -288,7 +351,7 @@ impl BackupEnvironment { /// Store the writer with an unique ID pub fn register_fixed_writer( &self, - index: FixedIndexWriter, + index: FixedIndexWriter, name: String, size: usize, chunk_size: u32, @@ -632,61 +695,6 @@ impl BackupEnvironment { Ok(()) } - /// If verify-new is set on the datastore, this will run a new verify task - /// for the backup. If not, this will return and also drop the passed lock - /// immediately. - pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> { - self.ensure_finished()?; - - if !self.datastore.verify_new() { - // no verify requested, do nothing - return Ok(()); - } - - // Downgrade to shared lock, the backup itself is finished - drop(excl_snap_lock); - let snap_lock = lock_dir_noblock_shared( - &self.backup_dir.full_path(), - "snapshot", - "snapshot is already locked by another operation", - )?; - - let worker_id = format!( - "{}:{}/{}/{:08X}", - self.datastore.name(), - self.backup_dir.backup_type(), - self.backup_dir.backup_id(), - self.backup_dir.backup_time() - ); - - let datastore = self.datastore.clone(); - let backup_dir = self.backup_dir.clone(); - - WorkerTask::new_thread( - "verify", - Some(worker_id), - self.auth_id.to_string(), - false, - move |worker| { - worker.log_message("Automatically verifying newly added snapshot"); - - let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore); - if !verify_backup_dir_with_lock( - &verify_worker, - &backup_dir, - worker.upid().clone(), - None, - snap_lock, - )? { - bail!("verification failed - please check the log for details"); - } - - Ok(()) - }, - ) - .map(|_| ()) - } - pub fn log>(&self, msg: S) { info!("{}", msg.as_ref()); } @@ -701,10 +709,6 @@ impl BackupEnvironment { } } - pub fn format_response(&self, result: Result) -> Response { - self.formatter.format_result(result, self) - } - /// Raise error if finished flag is not set pub fn ensure_finished(&self) -> Result<(), Error> { let state = self.state.lock().unwrap(); @@ -735,7 +739,7 @@ impl BackupEnvironment { } } -impl RpcEnvironment for BackupEnvironment { +impl RpcEnvironment for BackupEnvironment { fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } @@ -757,14 +761,18 @@ impl RpcEnvironment for BackupEnvironment { } } -impl AsRef for dyn RpcEnvironment { - fn as_ref(&self) -> &BackupEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for dyn RpcEnvironment { + fn as_ref(&self) -> &BackupEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } -impl AsRef for Box { - fn as_ref(&self) -> &BackupEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for Box { + fn as_ref(&self) -> &BackupEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs index e6a92117..fda7f41d 100644 --- a/src/api2/backup/mod.rs +++ b/src/api2/backup/mod.rs @@ -6,6 +6,7 @@ use hex::FromHex; use hyper::header::{HeaderValue, CONNECTION, UPGRADE}; use hyper::http::request::Parts; use hyper::{Body, Request, Response, StatusCode}; +use pbs_datastore::chunk_store::{Lookup, Write}; use serde::Deserialize; use serde_json::{json, Value}; @@ -281,7 +282,7 @@ fn upgrade_to_backup_protocol( return Ok(()); } - let verify = |env: BackupEnvironment| { + let verify = |env: BackupEnvironment| { if let Err(err) = env.verify_after_complete(snap_guard) { env.log(format!( "backup finished, but starting the requested verify task failed: {}", @@ -402,7 +403,7 @@ fn create_dynamic_index( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let name = required_string_param(¶m, "archive-name")?.to_owned(); @@ -452,7 +453,7 @@ fn create_fixed_index( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let name = required_string_param(¶m, "archive-name")?.to_owned(); let size = required_integer_param(¶m, "size")? as usize; @@ -567,7 +568,7 @@ fn dynamic_append( ); } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.debug(format!("dynamic_append {} chunks", digest_list.len())); @@ -641,7 +642,7 @@ fn fixed_append( ); } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.debug(format!("fixed_append {} chunks", digest_list.len())); @@ -716,7 +717,7 @@ fn close_dynamic_index( let csum_str = required_string_param(¶m, "csum")?; let csum = <[u8; 32]>::from_hex(csum_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.dynamic_writer_close(wid, chunk_count, size, csum)?; @@ -769,7 +770,7 @@ fn close_fixed_index( let csum_str = required_string_param(¶m, "csum")?; let csum = <[u8; 32]>::from_hex(csum_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.fixed_writer_close(wid, chunk_count, size, csum)?; @@ -783,7 +784,7 @@ fn finish_backup( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); env.finish_backup()?; env.log("successfully finished backup"); @@ -802,7 +803,7 @@ fn get_previous_backup_time( _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, ) -> Result { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let backup_time = env .last_backup @@ -829,7 +830,7 @@ fn download_previous( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let archive_name = required_string_param(¶m, "archive-name")?.to_owned(); diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 20259660..57b30dec 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -7,6 +7,7 @@ use futures::*; use hex::FromHex; use hyper::http::request::Parts; use hyper::Body; +use pbs_datastore::chunk_store::{CanWrite, Write as StoreWrite}; use serde_json::{json, Value}; use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment}; @@ -20,19 +21,19 @@ use pbs_tools::json::{required_integer_param, required_string_param}; use super::environment::*; -pub struct UploadChunk { +pub struct UploadChunk { stream: Body, - store: Arc, + store: Arc>, digest: [u8; 32], size: u32, encoded_size: u32, raw_data: Option>, } -impl UploadChunk { +impl UploadChunk { pub fn new( stream: Body, - store: Arc, + store: Arc>, digest: [u8; 32], size: u32, encoded_size: u32, @@ -48,7 +49,7 @@ impl UploadChunk { } } -impl Future for UploadChunk { +impl Future for UploadChunk { type Output = Result<([u8; 32], u32, u32, bool), Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { @@ -159,7 +160,7 @@ fn upload_fixed_chunk( let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; @@ -228,7 +229,7 @@ fn upload_dynamic_chunk( let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); let (digest, size, compressed_size, is_duplicate) = UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?; @@ -273,7 +274,7 @@ fn upload_speedtest( println!("Upload error: {}", err); } } - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); Ok(env.format_response(Ok(Value::Null))) } .boxed() @@ -312,7 +313,7 @@ fn upload_blob( let file_name = required_string_param(¶m, "file-name")?.to_owned(); let encoded_size = required_integer_param(¶m, "encoded-size")? as usize; - let env: &BackupEnvironment = rpcenv.as_ref(); + let env: &BackupEnvironment = rpcenv.as_ref(); if !file_name.ends_with(".blob") { bail!("wrong blob file extension: '{}'", file_name); diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs index 3b2f06f4..13d346eb 100644 --- a/src/api2/reader/environment.rs +++ b/src/api2/reader/environment.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock}; +use pbs_datastore::chunk_store::CanRead; use serde_json::{json, Value}; use proxmox_router::{RpcEnvironment, RpcEnvironmentType}; @@ -14,25 +15,25 @@ use tracing::info; /// `RpcEnvironment` implementation for backup reader service #[derive(Clone)] -pub struct ReaderEnvironment { +pub struct ReaderEnvironment { env_type: RpcEnvironmentType, result_attributes: Value, auth_id: Authid, pub debug: bool, pub formatter: &'static dyn OutputFormatter, pub worker: Arc, - pub datastore: Arc, - pub backup_dir: BackupDir, + pub datastore: Arc>, + pub backup_dir: BackupDir, allowed_chunks: Arc>>, } -impl ReaderEnvironment { +impl ReaderEnvironment { pub fn new( env_type: RpcEnvironmentType, auth_id: Authid, worker: Arc, - datastore: Arc, - backup_dir: BackupDir, + datastore: Arc>, + backup_dir: BackupDir, ) -> Self { Self { result_attributes: json!({}), @@ -71,7 +72,7 @@ impl ReaderEnvironment { } } -impl RpcEnvironment for ReaderEnvironment { +impl RpcEnvironment for ReaderEnvironment { fn result_attrib_mut(&mut self) -> &mut Value { &mut self.result_attributes } @@ -93,14 +94,18 @@ impl RpcEnvironment for ReaderEnvironment { } } -impl AsRef for dyn RpcEnvironment { - fn as_ref(&self) -> &ReaderEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for dyn RpcEnvironment { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } -impl AsRef for Box { - fn as_ref(&self) -> &ReaderEnvironment { - self.as_any().downcast_ref::().unwrap() +impl AsRef> for Box { + fn as_ref(&self) -> &ReaderEnvironment { + self.as_any() + .downcast_ref::>() + .unwrap() } } diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs index 48a8c5fc..cf20addc 100644 --- a/src/api2/reader/mod.rs +++ b/src/api2/reader/mod.rs @@ -6,6 +6,7 @@ use hex::FromHex; use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE}; use hyper::http::request::Parts; use hyper::{Body, Request, Response, StatusCode}; +use pbs_datastore::chunk_store::Read; use serde::Deserialize; use serde_json::Value; @@ -253,7 +254,7 @@ fn download_file( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &ReaderEnvironment = rpcenv.as_ref(); + let env: &ReaderEnvironment = rpcenv.as_ref(); let file_name = required_string_param(¶m, "file-name")?.to_owned(); @@ -309,7 +310,7 @@ fn download_chunk( rpcenv: Box, ) -> ApiResponseFuture { async move { - let env: &ReaderEnvironment = rpcenv.as_ref(); + let env: &ReaderEnvironment = rpcenv.as_ref(); let digest_str = required_string_param(¶m, "digest")?; let digest = <[u8; 32]>::from_hex(digest_str)?; diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs index cf5a0189..662b953e 100644 --- a/src/api2/tape/backup.rs +++ b/src/api2/tape/backup.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; +use pbs_datastore::chunk_store::CanWrite; use serde_json::Value; use tracing::{info, warn}; @@ -11,9 +12,9 @@ use proxmox_schema::api; use proxmox_worker_task::WorkerTaskContext; use pbs_api_types::{ - print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation, - TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, - PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE, UPID_SCHEMA, + print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, TapeBackupJobConfig, + TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, + PRIV_TAPE_WRITE, UPID_SCHEMA, }; use pbs_config::CachedUserInfo; @@ -150,7 +151,7 @@ pub fn do_tape_backup_job( let worker_type = job.jobtype().to_string(); - let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?; + let datastore = DataStore::lookup_datastore_write(&setup.store)?; let (config, _digest) = pbs_config::media_pool::config()?; let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?; @@ -306,7 +307,7 @@ pub fn backup( check_backup_permission(&auth_id, &setup.store, &setup.pool, &setup.drive)?; - let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?; + let datastore = DataStore::lookup_datastore_write(&setup.store)?; let (config, _digest) = pbs_config::media_pool::config()?; let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?; @@ -360,9 +361,9 @@ enum SnapshotBackupResult { Ignored, } -fn backup_worker( +fn backup_worker( worker: &WorkerTask, - datastore: Arc, + datastore: Arc>, pool_config: &MediaPoolConfig, setup: &TapeBackupJobSetup, summary: &mut TapeBackupJobSummary, @@ -560,11 +561,11 @@ fn update_media_online_status(drive: &str) -> Result, Error> { } } -fn backup_snapshot( +fn backup_snapshot( worker: &WorkerTask, pool_writer: &mut PoolWriter, - datastore: Arc, - snapshot: BackupDir, + datastore: Arc>, + snapshot: BackupDir, ) -> Result { let snapshot_path = snapshot.relative_path(); info!("backup snapshot {snapshot_path:?}"); diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs index ba9051de..342406c6 100644 --- a/src/api2/tape/drive.rs +++ b/src/api2/tape/drive.rs @@ -1342,7 +1342,7 @@ pub fn catalog_media( drive.read_label()?; // skip over labels - we already read them above let mut checked_chunks = HashMap::new(); - restore_media( + restore_media::( worker, &mut drive, &media_id, diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 95ce13c7..20558670 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{bail, format_err, Error}; +use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as StoreWrite}; use serde_json::Value; use tracing::{info, warn}; @@ -120,13 +121,13 @@ impl NamespaceMap { } } -pub struct DataStoreMap { - map: HashMap>, - default: Option>, +pub struct DataStoreMap { + map: HashMap>>, + default: Option>>, ns_map: Option, } -impl TryFrom for DataStoreMap { +impl TryFrom for DataStoreMap { type Error = Error; fn try_from(value: String) -> Result { @@ -161,7 +162,7 @@ impl TryFrom for DataStoreMap { } } -impl DataStoreMap { +impl DataStoreMap { fn add_namespaces_maps(&mut self, mappings: Vec) -> Result { let count = mappings.len(); let ns_map = NamespaceMap::try_from(mappings)?; @@ -169,7 +170,9 @@ impl DataStoreMap { Ok(count > 0) } - fn used_datastores(&self) -> HashMap<&str, (Arc, Option>)> { + fn used_datastores( + &self, + ) -> HashMap<&str, (Arc>, Option>)> { let mut map = HashMap::new(); for (source, target) in self.map.iter() { let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source)); @@ -189,7 +192,7 @@ impl DataStoreMap { .map(|mapping| mapping.get_namespaces(datastore, ns)) } - fn target_store(&self, source_datastore: &str) -> Option> { + fn target_store(&self, source_datastore: &str) -> Option>> { self.map .get(source_datastore) .or(self.default.as_ref()) @@ -200,7 +203,7 @@ impl DataStoreMap { &self, source_datastore: &str, source_ns: &BackupNamespace, - ) -> Option<(Arc, Option>)> { + ) -> Option<(Arc>, Option>)> { self.target_store(source_datastore) .map(|store| (store, self.target_ns(source_datastore, source_ns))) } @@ -237,9 +240,9 @@ fn check_datastore_privs( Ok(()) } -fn check_and_create_namespaces( +fn check_and_create_namespaces( user_info: &CachedUserInfo, - store: &Arc, + store: &Arc>, ns: &BackupNamespace, auth_id: &Authid, owner: Option<&Authid>, @@ -449,13 +452,13 @@ pub fn restore( } #[allow(clippy::too_many_arguments)] -fn restore_full_worker( +fn restore_full_worker( worker: Arc, inventory: Inventory, media_set_uuid: Uuid, drive_config: SectionConfigData, drive_name: &str, - store_map: DataStoreMap, + store_map: DataStoreMap, restore_owner: &Authid, notification_mode: &TapeNotificationMode, auth_id: &Authid, @@ -529,8 +532,8 @@ fn restore_full_worker( } #[allow(clippy::too_many_arguments)] -fn check_snapshot_restorable( - store_map: &DataStoreMap, +fn check_snapshot_restorable( + store_map: &DataStoreMap, store: &str, snapshot: &str, ns: &BackupNamespace, @@ -618,14 +621,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator( worker: Arc, snapshots: Vec, inventory: Inventory, media_set_uuid: Uuid, drive_config: SectionConfigData, drive_name: &str, - store_map: DataStoreMap, + store_map: DataStoreMap, restore_owner: &Authid, notification_mode: &TapeNotificationMode, user_info: Arc, @@ -955,16 +958,16 @@ fn get_media_set_catalog( Ok(catalog) } -fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf { +fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf { let mut path = datastore.base_path(); path.push(".tmp"); path.push(media_set_uuid.to_string()); path } -fn snapshot_tmpdir( +fn snapshot_tmpdir( source_datastore: &str, - datastore: &DataStore, + datastore: &DataStore, snapshot: &str, media_set_uuid: &Uuid, ) -> PathBuf { @@ -974,9 +977,9 @@ fn snapshot_tmpdir( path } -fn restore_snapshots_to_tmpdir( +fn restore_snapshots_to_tmpdir( worker: Arc, - store_map: &DataStoreMap, + store_map: &DataStoreMap, file_list: &[u64], mut drive: Box, media_id: &MediaId, @@ -1083,10 +1086,10 @@ fn restore_snapshots_to_tmpdir( Ok(tmp_paths) } -fn restore_file_chunk_map( +fn restore_file_chunk_map( worker: Arc, drive: &mut Box, - store_map: &DataStoreMap, + store_map: &DataStoreMap, file_chunk_map: &mut BTreeMap>, ) -> Result<(), Error> { for (nr, chunk_map) in file_chunk_map.iter_mut() { @@ -1133,10 +1136,10 @@ fn restore_file_chunk_map( Ok(()) } -fn restore_partial_chunk_archive<'a>( +fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, reader: Box, - datastore: Arc, + datastore: Arc>, chunk_list: &mut HashSet<[u8; 32]>, ) -> Result { let mut decoder = ChunkArchiveDecoder::new(reader); @@ -1195,12 +1198,12 @@ fn restore_partial_chunk_archive<'a>( /// Request and restore complete media without using existing catalog (create catalog instead) #[allow(clippy::too_many_arguments)] -pub fn request_and_restore_media( +pub fn request_and_restore_media( worker: Arc, media_id: &MediaId, drive_config: &SectionConfigData, drive_name: &str, - store_map: &DataStoreMap, + store_map: &DataStoreMap, checked_chunks_map: &mut HashMap>, restore_owner: &Authid, notification_mode: &TapeNotificationMode, @@ -1253,11 +1256,11 @@ pub fn request_and_restore_media( /// Restore complete media content and catalog /// /// Only create the catalog if target is None. -pub fn restore_media( +pub fn restore_media( worker: Arc, drive: &mut Box, media_id: &MediaId, - target: Option<(&DataStoreMap, &Authid)>, + target: Option<(&DataStoreMap, &Authid)>, checked_chunks_map: &mut HashMap>, verbose: bool, auth_id: &Authid, @@ -1301,11 +1304,11 @@ pub fn restore_media( } #[allow(clippy::too_many_arguments)] -fn restore_archive<'a>( +fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, mut reader: Box, current_file_number: u64, - target: Option<(&DataStoreMap, &Authid)>, + target: Option<(&DataStoreMap, &Authid)>, catalog: &mut MediaCatalog, checked_chunks_map: &mut HashMap>, verbose: bool, @@ -1525,10 +1528,10 @@ fn scan_chunk_archive<'a>( Ok(Some(chunks)) } -fn restore_chunk_archive<'a>( +fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>( worker: Arc, reader: Box, - datastore: Arc, + datastore: Arc>, checked_chunks: &mut HashSet<[u8; 32]>, verbose: bool, ) -> Result>, Error> { -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel