From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks
Date: Tue, 3 Sep 2024 14:34:00 +0200 [thread overview]
Message-ID: <20240903123401.91513-10-h.laimer@proxmox.com> (raw)
In-Reply-To: <20240903123401.91513-1-h.laimer@proxmox.com>
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/api2/admin/datastore.rs | 27 ++---
src/api2/backup/environment.rs | 176 +++++++++++++++++---------------
src/api2/backup/mod.rs | 21 ++--
src/api2/backup/upload_chunk.rs | 19 ++--
src/api2/reader/environment.rs | 31 +++---
src/api2/reader/mod.rs | 5 +-
src/api2/tape/backup.rs | 21 ++--
src/api2/tape/drive.rs | 2 +-
src/api2/tape/restore.rs | 69 +++++++------
9 files changed, 196 insertions(+), 175 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f1eed9fc..e1124284 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Error};
use futures::*;
use hyper::http::request::Parts;
use hyper::{header, Body, Response, StatusCode};
+use pbs_datastore::chunk_store::{CanRead, Read};
use serde::Deserialize;
use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream;
@@ -73,8 +74,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
const GROUP_NOTES_FILE_NAME: &str = "notes";
-fn get_group_note_path(
- store: &DataStore,
+fn get_group_note_path<T>(
+ store: &DataStore<T>,
ns: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
) -> PathBuf {
@@ -111,8 +112,8 @@ fn check_privs<T: CanRead>(
Ok(())
}
-fn read_backup_index(
- backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+ backup_dir: &BackupDir<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, index_size) = backup_dir.load_manifest()?;
@@ -137,8 +138,8 @@ fn read_backup_index(
Ok((manifest, result))
}
-fn get_all_snapshot_files(
- info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+ info: &BackupInfo<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
@@ -502,7 +503,7 @@ unsafe fn list_snapshots_blocking(
(None, None) => datastore.list_backup_groups(ns.clone())?,
};
- let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+ let info_to_snapshot_list_item = |group: &BackupGroup<Read>, owner, info: BackupInfo<Read>| {
let backup = pbs_api_types::BackupDir {
group: group.into(),
time: info.backup_dir.backup_time(),
@@ -604,8 +605,8 @@ unsafe fn list_snapshots_blocking(
})
}
-async fn get_snapshots_count(
- store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+ store: &Arc<DataStore<T>>,
owner: Option<&Authid>,
) -> Result<Counts, Error> {
let store = Arc::clone(store);
@@ -1771,12 +1772,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
-fn get_local_pxar_reader(
- datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+ datastore: Arc<DataStore<T>>,
manifest: &BackupManifest,
- backup_dir: &BackupDir,
+ backup_dir: &BackupDir<T>,
pxar_name: &str,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
let mut path = datastore.base_path();
path.push(backup_dir.relative_path());
path.push(pxar_name);
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2..f222da76 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, format_err, Error};
use nix::dir::Dir;
+use pbs_datastore::chunk_store::CanWrite;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::info;
@@ -53,17 +54,17 @@ impl std::ops::Add for UploadStatistic {
}
}
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
name: String,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
-struct FixedWriterState {
+struct FixedWriterState<T> {
name: String,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
size: usize,
chunk_size: u32,
chunk_count: u64,
@@ -75,18 +76,18 @@ struct FixedWriterState {
// key=digest, value=length
type KnownChunksMap = HashMap<[u8; 32], u32>;
-struct SharedBackupState {
+struct SharedBackupState<T> {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
- dynamic_writers: HashMap<usize, DynamicWriterState>,
- fixed_writers: HashMap<usize, FixedWriterState>,
+ dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+ fixed_writers: HashMap<usize, FixedWriterState<T>>,
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
}
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
@@ -104,26 +105,26 @@ impl SharedBackupState {
/// `RpcEnvironment` implementation for backup service
#[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
- pub last_backup: Option<BackupInfo>,
- state: Arc<Mutex<SharedBackupState>>,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
+ pub last_backup: Option<BackupInfo<T>>,
+ state: Arc<Mutex<SharedBackupState<T>>>,
}
-impl BackupEnvironment {
+impl<T> BackupEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
let state = SharedBackupState {
finished: false,
@@ -149,7 +150,69 @@ impl BackupEnvironment {
state: Arc::new(Mutex::new(state)),
}
}
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+ pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+ self.formatter.format_result(result, self)
+ }
+
+ /// If verify-new is set on the datastore, this will run a new verify task
+ /// for the backup. If not, this will return and also drop the passed lock
+ /// immediately.
+ pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
+ self.ensure_finished()?;
+
+ if !self.datastore.verify_new() {
+ // no verify requested, do nothing
+ return Ok(());
+ }
+
+ // Downgrade to shared lock, the backup itself is finished
+ drop(excl_snap_lock);
+ let snap_lock = lock_dir_noblock_shared(
+ &self.backup_dir.full_path(),
+ "snapshot",
+ "snapshot is already locked by another operation",
+ )?;
+
+ let worker_id = format!(
+ "{}:{}/{}/{:08X}",
+ self.datastore.name(),
+ self.backup_dir.backup_type(),
+ self.backup_dir.backup_id(),
+ self.backup_dir.backup_time()
+ );
+
+ let datastore = self.datastore.clone();
+ let backup_dir = self.backup_dir.clone();
+
+ WorkerTask::new_thread(
+ "verify",
+ Some(worker_id),
+ self.auth_id.to_string(),
+ false,
+ move |worker| {
+ worker.log_message("Automatically verifying newly added snapshot");
+
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ if !verify_backup_dir_with_lock(
+ &verify_worker,
+ &backup_dir,
+ worker.upid().clone(),
+ None,
+ snap_lock,
+ )? {
+ bail!("verification failed - please check the log for details");
+ }
+ Ok(())
+ },
+ )
+ .map(|_| ())
+ }
+}
+impl<T: CanWrite> BackupEnvironment<T> {
/// Register a Chunk with associated length.
///
/// We do not fully trust clients, so a client may only use registered
@@ -262,7 +325,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_dynamic_writer(
&self,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
name: String,
) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@@ -288,7 +351,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_fixed_writer(
&self,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
name: String,
size: usize,
chunk_size: u32,
@@ -632,61 +695,6 @@ impl BackupEnvironment {
Ok(())
}
- /// If verify-new is set on the datastore, this will run a new verify task
- /// for the backup. If not, this will return and also drop the passed lock
- /// immediately.
- pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
- self.ensure_finished()?;
-
- if !self.datastore.verify_new() {
- // no verify requested, do nothing
- return Ok(());
- }
-
- // Downgrade to shared lock, the backup itself is finished
- drop(excl_snap_lock);
- let snap_lock = lock_dir_noblock_shared(
- &self.backup_dir.full_path(),
- "snapshot",
- "snapshot is already locked by another operation",
- )?;
-
- let worker_id = format!(
- "{}:{}/{}/{:08X}",
- self.datastore.name(),
- self.backup_dir.backup_type(),
- self.backup_dir.backup_id(),
- self.backup_dir.backup_time()
- );
-
- let datastore = self.datastore.clone();
- let backup_dir = self.backup_dir.clone();
-
- WorkerTask::new_thread(
- "verify",
- Some(worker_id),
- self.auth_id.to_string(),
- false,
- move |worker| {
- worker.log_message("Automatically verifying newly added snapshot");
-
- let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
- if !verify_backup_dir_with_lock(
- &verify_worker,
- &backup_dir,
- worker.upid().clone(),
- None,
- snap_lock,
- )? {
- bail!("verification failed - please check the log for details");
- }
-
- Ok(())
- },
- )
- .map(|_| ())
- }
-
pub fn log<S: AsRef<str>>(&self, msg: S) {
info!("{}", msg.as_ref());
}
@@ -701,10 +709,6 @@ impl BackupEnvironment {
}
}
- pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
- self.formatter.format_result(result, self)
- }
-
/// Raise error if finished flag is not set
pub fn ensure_finished(&self) -> Result<(), Error> {
let state = self.state.lock().unwrap();
@@ -735,7 +739,7 @@ impl BackupEnvironment {
}
}
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -757,14 +761,18 @@ impl RpcEnvironment for BackupEnvironment {
}
}
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index e6a92117..fda7f41d 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::{Lookup, Write};
use serde::Deserialize;
use serde_json::{json, Value};
@@ -281,7 +282,7 @@ fn upgrade_to_backup_protocol(
return Ok(());
}
- let verify = |env: BackupEnvironment| {
+ let verify = |env: BackupEnvironment<pbs_datastore::chunk_store::Write>| {
if let Err(err) = env.verify_after_complete(snap_guard) {
env.log(format!(
"backup finished, but starting the requested verify task failed: {}",
@@ -402,7 +403,7 @@ fn create_dynamic_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
@@ -452,7 +453,7 @@ fn create_fixed_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
let size = required_integer_param(¶m, "size")? as usize;
@@ -567,7 +568,7 @@ fn dynamic_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("dynamic_append {} chunks", digest_list.len()));
@@ -641,7 +642,7 @@ fn fixed_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("fixed_append {} chunks", digest_list.len()));
@@ -716,7 +717,7 @@ fn close_dynamic_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.dynamic_writer_close(wid, chunk_count, size, csum)?;
@@ -769,7 +770,7 @@ fn close_fixed_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size, csum)?;
@@ -783,7 +784,7 @@ fn finish_backup(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Write> = rpcenv.as_ref();
env.finish_backup()?;
env.log("successfully finished backup");
@@ -802,7 +803,7 @@ fn get_previous_backup_time(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Lookup> = rpcenv.as_ref();
let backup_time = env
.last_backup
@@ -829,7 +830,7 @@ fn download_previous(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let archive_name = required_string_param(¶m, "archive-name")?.to_owned();
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..57b30dec 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -7,6 +7,7 @@ use futures::*;
use hex::FromHex;
use hyper::http::request::Parts;
use hyper::Body;
+use pbs_datastore::chunk_store::{CanWrite, Write as StoreWrite};
use serde_json::{json, Value};
use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -20,19 +21,19 @@ use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
raw_data: Option<Vec<u8>>,
}
-impl UploadChunk {
+impl<T: CanWrite> UploadChunk<T> {
pub fn new(
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
}
}
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
type Output = Result<([u8; 32], u32, u32, bool), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
println!("Upload error: {}", err);
}
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
}
.boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
let file_name = required_string_param(¶m, "file-name")?.to_owned();
let encoded_size = required_integer_param(¶m, "encoded-size")? as usize;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..13d346eb 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
+use pbs_datastore::chunk_store::CanRead;
use serde_json::{json, Value};
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
@@ -14,25 +15,25 @@ use tracing::info;
/// `RpcEnvironment` implementation for backup reader service
#[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
}
-impl ReaderEnvironment {
+impl<T: CanRead> ReaderEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
Self {
result_attributes: json!({}),
@@ -71,7 +72,7 @@ impl ReaderEnvironment {
}
}
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -93,14 +94,18 @@ impl RpcEnvironment for ReaderEnvironment {
}
}
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 48a8c5fc..cf20addc 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::Read;
use serde::Deserialize;
use serde_json::Value;
@@ -253,7 +254,7 @@ fn download_file(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let file_name = required_string_param(¶m, "file-name")?.to_owned();
@@ -309,7 +310,7 @@ fn download_chunk(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a0189..662b953e 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -1,6 +1,7 @@
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::CanWrite;
use serde_json::Value;
use tracing::{info, warn};
@@ -11,9 +12,9 @@ use proxmox_schema::api;
use proxmox_worker_task::WorkerTaskContext;
use pbs_api_types::{
- print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation,
- TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA,
- PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE, UPID_SCHEMA,
+ print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, TapeBackupJobConfig,
+ TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT,
+ PRIV_TAPE_WRITE, UPID_SCHEMA,
};
use pbs_config::CachedUserInfo;
@@ -150,7 +151,7 @@ pub fn do_tape_backup_job(
let worker_type = job.jobtype().to_string();
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -306,7 +307,7 @@ pub fn backup(
check_backup_permission(&auth_id, &setup.store, &setup.pool, &setup.drive)?;
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
Ignored,
}
-fn backup_worker(
+fn backup_worker<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
pool_config: &MediaPoolConfig,
setup: &TapeBackupJobSetup,
summary: &mut TapeBackupJobSummary,
@@ -560,11 +561,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
}
}
-fn backup_snapshot(
+fn backup_snapshot<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
pool_writer: &mut PoolWriter,
- datastore: Arc<DataStore>,
- snapshot: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ snapshot: BackupDir<T>,
) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path();
info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..342406c6 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -1342,7 +1342,7 @@ pub fn catalog_media(
drive.read_label()?; // skip over labels - we already read them above
let mut checked_chunks = HashMap::new();
- restore_media(
+ restore_media::<pbs_datastore::chunk_store::Write>(
worker,
&mut drive,
&media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 95ce13c7..20558670 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as StoreWrite};
use serde_json::Value;
use tracing::{info, warn};
@@ -120,13 +121,13 @@ impl NamespaceMap {
}
}
-pub struct DataStoreMap {
- map: HashMap<String, Arc<DataStore>>,
- default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+ map: HashMap<String, Arc<DataStore<T>>>,
+ default: Option<Arc<DataStore<T>>>,
ns_map: Option<NamespaceMap>,
}
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<StoreWrite> {
type Error = Error;
fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
}
}
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,9 @@ impl DataStoreMap {
Ok(count > 0)
}
- fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+ fn used_datastores(
+ &self,
+ ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,7 +192,7 @@ impl DataStoreMap {
.map(|mapping| mapping.get_namespaces(datastore, ns))
}
- fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+ fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
self.map
.get(source_datastore)
.or(self.default.as_ref())
@@ -200,7 +203,7 @@ impl DataStoreMap {
&self,
source_datastore: &str,
source_ns: &BackupNamespace,
- ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+ ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
self.target_store(source_datastore)
.map(|store| (store, self.target_ns(source_datastore, source_ns)))
}
@@ -237,9 +240,9 @@ fn check_datastore_privs(
Ok(())
}
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
user_info: &CachedUserInfo,
- store: &Arc<DataStore>,
+ store: &Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
owner: Option<&Authid>,
@@ -449,13 +452,13 @@ pub fn restore(
}
#[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
auth_id: &Authid,
@@ -529,8 +532,8 @@ fn restore_full_worker(
}
#[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
- store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+ store_map: &DataStoreMap<T>,
store: &str,
snapshot: &str,
ns: &BackupNamespace,
@@ -618,14 +621,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
}
#[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
user_info: Arc<CachedUserInfo>,
@@ -955,16 +958,16 @@ fn get_media_set_catalog(
Ok(catalog)
}
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
let mut path = datastore.base_path();
path.push(".tmp");
path.push(media_set_uuid.to_string());
path
}
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
source_datastore: &str,
- datastore: &DataStore,
+ datastore: &DataStore<T>,
snapshot: &str,
media_set_uuid: &Uuid,
) -> PathBuf {
@@ -974,9 +977,9 @@ fn snapshot_tmpdir(
path
}
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
worker: Arc<WorkerTask>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
@@ -1083,10 +1086,10 @@ fn restore_snapshots_to_tmpdir(
Ok(tmp_paths)
}
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1136,10 @@ fn restore_file_chunk_map(
Ok(())
}
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1198,12 @@ fn restore_partial_chunk_archive<'a>(
/// Request and restore complete media without using existing catalog (create catalog instead)
#[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
media_id: &MediaId,
drive_config: &SectionConfigData,
drive_name: &str,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
@@ -1253,11 +1256,11 @@ pub fn request_and_restore_media(
/// Restore complete media content and catalog
///
/// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
media_id: &MediaId,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
auth_id: &Authid,
@@ -1301,11 +1304,11 @@ pub fn restore_media(
}
#[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
mut reader: Box<dyn 'a + TapeRead>,
current_file_number: u64,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
catalog: &mut MediaCatalog,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
@@ -1525,10 +1528,10 @@ fn scan_chunk_archive<'a>(
Ok(Some(chunks))
}
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
checked_chunks: &mut HashSet<[u8; 32]>,
verbose: bool,
) -> Result<Option<Vec<[u8; 32]>>, Error> {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-09-03 12:34 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 01/10] chunkstore: add CanRead and CanWrite trait Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 02/10] chunkstore: separate functions into impl block Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 03/10] datastore: add generics and new lookup functions Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 04/10] datastore: separate functions into impl block Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 05/10] backup_info: add generics and separate functions into impl blocks Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 06/10] pbs-datastore: " Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 07/10] api: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 08/10] server/bin: " Hannes Laimer
2024-09-03 12:34 ` Hannes Laimer [this message]
2024-09-03 12:34 ` [pbs-devel] [PATCH proxmox-backup RFC 10/10] backup/server/tape: add generics and separate functions into impl blocks Hannes Laimer
2024-09-04 7:34 ` [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Wolfgang Bumiller
2025-05-26 14:18 ` [pbs-devel] superseded: " Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240903123401.91513-10-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.