From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 08/12] api/backup/bin/server/tape: add missing generics
Date: Mon, 26 May 2025 16:14:41 +0200 [thread overview]
Message-ID: <20250526141445.228717-9-h.laimer@proxmox.com> (raw)
In-Reply-To: <20250526141445.228717-1-h.laimer@proxmox.com>
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/api2/admin/datastore.rs | 27 ++++----
src/api2/backup/mod.rs | 21 +++---
src/api2/backup/upload_chunk.rs | 19 +++---
src/api2/config/datastore.rs | 5 +-
src/api2/reader/environment.rs | 30 +++++----
src/api2/reader/mod.rs | 5 +-
src/api2/tape/backup.rs | 11 ++--
src/api2/tape/drive.rs | 3 +-
src/api2/tape/restore.rs | 71 +++++++++++----------
src/backup/hierarchy.rs | 23 +++----
src/backup/verify.rs | 53 +++++++--------
src/bin/proxmox-backup-proxy.rs | 7 +-
src/server/gc_job.rs | 7 +-
src/server/prune_job.rs | 5 +-
src/server/pull.rs | 23 +++----
src/server/push.rs | 3 +-
src/server/sync.rs | 13 ++--
src/tape/file_formats/snapshot_archive.rs | 5 +-
src/tape/pool_writer/mod.rs | 11 ++--
src/tape/pool_writer/new_chunks_iterator.rs | 7 +-
20 files changed, 189 insertions(+), 160 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 39249448..e3f93cdd 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -54,6 +54,7 @@ use pbs_config::CachedUserInfo;
use pbs_datastore::backup_info::BackupInfo;
use pbs_datastore::cached_chunk_reader::CachedChunkReader;
use pbs_datastore::catalog::{ArchiveEntry, CatalogReader};
+use pbs_datastore::chunk_store::{CanRead, Read as R};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::data_blob_reader::DataBlobReader;
use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
@@ -79,8 +80,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
const GROUP_NOTES_FILE_NAME: &str = "notes";
-fn get_group_note_path(
- store: &DataStore,
+fn get_group_note_path<T>(
+ store: &DataStore<T>,
ns: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
) -> PathBuf {
@@ -114,8 +115,8 @@ fn check_privs_and_load_store(
Ok(datastore)
}
-fn read_backup_index(
- backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+ backup_dir: &BackupDir<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, index_size) = backup_dir.load_manifest()?;
@@ -140,8 +141,8 @@ fn read_backup_index(
Ok((manifest, result))
}
-fn get_all_snapshot_files(
- info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+ info: &BackupInfo<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
@@ -529,7 +530,7 @@ unsafe fn list_snapshots_blocking(
(None, None) => datastore.list_backup_groups(ns.clone())?,
};
- let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+ let info_to_snapshot_list_item = |group: &BackupGroup<R>, owner, info: BackupInfo<R>| {
let backup = pbs_api_types::BackupDir {
group: group.into(),
time: info.backup_dir.backup_time(),
@@ -629,8 +630,8 @@ unsafe fn list_snapshots_blocking(
})
}
-async fn get_snapshots_count(
- store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+ store: &Arc<DataStore<T>>,
owner: Option<&Authid>,
) -> Result<Counts, Error> {
let store = Arc::clone(store);
@@ -1796,12 +1797,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
-fn get_local_pxar_reader(
- datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+ datastore: Arc<DataStore<T>>,
manifest: &BackupManifest,
- backup_dir: &BackupDir,
+ backup_dir: &BackupDir<T>,
pxar_name: &BackupArchiveName,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
let mut path = datastore.base_path();
path.push(backup_dir.relative_path());
path.push(pxar_name.as_ref());
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index 629df933..79354dbf 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{Read as R, Write as W};
use pbs_datastore::index::IndexFile;
use pbs_datastore::{DataStore, PROXMOX_BACKUP_PROTOCOL_ID_V1};
use pbs_tools::json::{required_array_param, required_integer_param, required_string_param};
@@ -279,7 +280,7 @@ fn upgrade_to_backup_protocol(
return Ok(());
}
- let verify = |env: BackupEnvironment| {
+ let verify = |env: BackupEnvironment<W>| {
if let Err(err) = env.verify_after_complete(snap_guard) {
env.log(format!(
"backup finished, but starting the requested verify task failed: {}",
@@ -400,7 +401,7 @@ fn create_dynamic_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
@@ -450,7 +451,7 @@ fn create_fixed_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
let size = required_integer_param(¶m, "size")? as usize;
@@ -565,7 +566,7 @@ fn dynamic_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.debug(format!("dynamic_append {} chunks", digest_list.len()));
@@ -639,7 +640,7 @@ fn fixed_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.debug(format!("fixed_append {} chunks", digest_list.len()));
@@ -714,7 +715,7 @@ fn close_dynamic_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.dynamic_writer_close(wid, chunk_count, size, csum)?;
@@ -767,7 +768,7 @@ fn close_fixed_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size, csum)?;
@@ -781,7 +782,7 @@ fn finish_backup(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
env.finish_backup()?;
env.log("successfully finished backup");
@@ -800,7 +801,7 @@ fn get_previous_backup_time(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<R> = rpcenv.as_ref();
let backup_time = env
.last_backup
@@ -827,7 +828,7 @@ fn download_previous(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<R> = rpcenv.as_ref();
let archive_name = required_string_param(¶m, "archive-name")?.to_owned();
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..bb1566ae 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -14,25 +14,26 @@ use proxmox_schema::*;
use proxmox_sortable_macro::sortable;
use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
+use pbs_datastore::chunk_store::{CanWrite, Lookup as L, Write as W};
use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
use pbs_datastore::{DataBlob, DataStore};
use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
raw_data: Option<Vec<u8>>,
}
-impl UploadChunk {
+impl<T> UploadChunk<T> {
pub fn new(
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
}
}
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
type Output = Result<([u8; 32], u32, u32, bool), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
println!("Upload error: {}", err);
}
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<L> = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
}
.boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
let file_name = required_string_param(¶m, "file-name")?.to_owned();
let encoded_size = required_integer_param(¶m, "encoded-size")? as usize;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<W> = rpcenv.as_ref();
if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
index b133be70..52fa6db1 100644
--- a/src/api2/config/datastore.rs
+++ b/src/api2/config/datastore.rs
@@ -30,6 +30,7 @@ use crate::api2::config::tape_backup_job::{delete_tape_backup_job, list_tape_bac
use crate::api2::config::verify::delete_verification_job;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{Read as R, Write as W};
use pbs_datastore::get_datastore_mount_status;
use proxmox_rest_server::WorkerTask;
@@ -124,7 +125,7 @@ pub(crate) fn do_create_datastore(
};
let chunk_store = if reuse_datastore {
- ChunkStore::verify_chunkstore(&path).and_then(|_| {
+ ChunkStore::<R>::verify_chunkstore(&path).and_then(|_| {
// Must be the only instance accessing and locking the chunk store,
// dropping will close all other locks from this process on the lockfile as well.
ChunkStore::open(
@@ -666,7 +667,7 @@ pub async fn delete_datastore(
auth_id.to_string(),
to_stdout,
move |_worker| {
- pbs_datastore::DataStore::destroy(&name, destroy_data)?;
+ pbs_datastore::DataStore::<W>::destroy(&name, destroy_data)?;
// ignore errors
let _ = jobstate::remove_state_file("prune", &name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..26f5bec6 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -14,25 +14,25 @@ use tracing::info;
/// `RpcEnvironment` implementation for backup reader service
#[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
}
-impl ReaderEnvironment {
+impl<T> ReaderEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
Self {
result_attributes: json!({}),
@@ -71,7 +71,7 @@ impl ReaderEnvironment {
}
}
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -93,14 +93,18 @@ impl RpcEnvironment for ReaderEnvironment {
}
}
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index cc791299..52f0953a 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -23,6 +23,7 @@ use pbs_api_types::{
DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
use pbs_datastore::index::IndexFile;
use pbs_datastore::{DataStore, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1};
use pbs_tools::json::required_string_param;
@@ -247,7 +248,7 @@ fn download_file(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<R> = rpcenv.as_ref();
let file_name = required_string_param(¶m, "file-name")?.to_owned();
@@ -303,7 +304,7 @@ fn download_chunk(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<R> = rpcenv.as_ref();
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index 31293a9a..306d5936 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
use pbs_config::CachedUserInfo;
use pbs_datastore::backup_info::{BackupDir, BackupInfo};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataStore, StoreProgress};
use crate::tape::TapeNotificationMode;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
Ignored,
}
-fn backup_worker(
+fn backup_worker<T: CanRead + Send + Sync + 'static>(
worker: &WorkerTask,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
pool_config: &MediaPoolConfig,
setup: &TapeBackupJobSetup,
summary: &mut TapeBackupJobSummary,
@@ -564,11 +565,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
}
}
-fn backup_snapshot(
+fn backup_snapshot<T: CanRead + Send + Sync + 'static>(
worker: &WorkerTask,
pool_writer: &mut PoolWriter,
- datastore: Arc<DataStore>,
- snapshot: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ snapshot: BackupDir<T>,
) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path();
info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..47fa06dc 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
use pbs_api_types::{PRIV_TAPE_AUDIT, PRIV_TAPE_READ, PRIV_TAPE_WRITE};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Write as W;
use pbs_tape::{
linux_list_drives::{lookup_device_identification, lto_tape_device_list, open_lto_tape_device},
sg_tape::tape_alert_flags_critical,
@@ -1342,7 +1343,7 @@ pub fn catalog_media(
drive.read_label()?; // skip over labels - we already read them above
let mut checked_chunks = HashMap::new();
- restore_media(
+ restore_media::<W>(
worker,
&mut drive,
&media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 2cc1baab..8f089c20 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -27,6 +27,7 @@ use pbs_api_types::{
};
use pbs_client::pxar::tools::handle_root_with_optional_format_version_prelude;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as W};
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
@@ -120,13 +121,13 @@ impl NamespaceMap {
}
}
-pub struct DataStoreMap {
- map: HashMap<String, Arc<DataStore>>,
- default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+ map: HashMap<String, Arc<DataStore<T>>>,
+ default: Option<Arc<DataStore<T>>>,
ns_map: Option<NamespaceMap>,
}
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<W> {
type Error = Error;
fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
}
}
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,10 @@ impl DataStoreMap {
Ok(count > 0)
}
- fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+ #[allow(clippy::type_complexity)]
+ fn used_datastores(
+ &self,
+ ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,18 +193,19 @@ impl DataStoreMap {
.map(|mapping| mapping.get_namespaces(datastore, ns))
}
- fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+ fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
self.map
.get(source_datastore)
.or(self.default.as_ref())
.map(Arc::clone)
}
+ #[allow(clippy::type_complexity)]
fn get_targets(
&self,
source_datastore: &str,
source_ns: &BackupNamespace,
- ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+ ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
self.target_store(source_datastore)
.map(|store| (store, self.target_ns(source_datastore, source_ns)))
}
@@ -237,9 +242,9 @@ fn check_datastore_privs(
Ok(())
}
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
user_info: &CachedUserInfo,
- store: &Arc<DataStore>,
+ store: &Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
owner: Option<&Authid>,
@@ -449,13 +454,13 @@ pub fn restore(
}
#[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
auth_id: &Authid,
@@ -529,8 +534,8 @@ fn restore_full_worker(
}
#[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
- store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+ store_map: &DataStoreMap<T>,
store: &str,
snapshot: &str,
ns: &BackupNamespace,
@@ -618,14 +623,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
}
#[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
user_info: Arc<CachedUserInfo>,
@@ -955,16 +960,16 @@ fn get_media_set_catalog(
Ok(catalog)
}
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
let mut path = datastore.base_path();
path.push(".tmp");
path.push(media_set_uuid.to_string());
path
}
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
source_datastore: &str,
- datastore: &DataStore,
+ datastore: &DataStore<T>,
snapshot: &str,
media_set_uuid: &Uuid,
) -> PathBuf {
@@ -974,9 +979,9 @@ fn snapshot_tmpdir(
path
}
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
worker: Arc<WorkerTask>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
@@ -1083,10 +1088,10 @@ fn restore_snapshots_to_tmpdir(
Ok(tmp_paths)
}
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1138,10 @@ fn restore_file_chunk_map(
Ok(())
}
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1200,12 @@ fn restore_partial_chunk_archive<'a>(
/// Request and restore complete media without using existing catalog (create catalog instead)
#[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
media_id: &MediaId,
drive_config: &SectionConfigData,
drive_name: &str,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
@@ -1253,11 +1258,11 @@ pub fn request_and_restore_media(
/// Restore complete media content and catalog
///
/// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
media_id: &MediaId,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
auth_id: &Authid,
@@ -1301,11 +1306,11 @@ pub fn restore_media(
}
#[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
mut reader: Box<dyn 'a + TapeRead>,
current_file_number: u64,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
catalog: &mut MediaCatalog,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
@@ -1525,10 +1530,10 @@ fn scan_chunk_archive<'a>(
Ok(Some(chunks))
}
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
checked_chunks: &mut HashSet<[u8; 32]>,
verbose: bool,
) -> Result<Option<Vec<[u8; 32]>>, Error> {
diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs
index 8dd71fcf..039e32a6 100644
--- a/src/backup/hierarchy.rs
+++ b/src/backup/hierarchy.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ,
};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive};
/// Asserts that `privs` are fulfilled on datastore + (optional) namespace.
@@ -68,8 +69,8 @@ pub fn check_ns_privs_full(
);
}
-pub fn can_access_any_namespace(
- store: Arc<DataStore>,
+pub fn can_access_any_namespace<T: CanRead + 'static>(
+ store: Arc<DataStore<T>>,
auth_id: &Authid,
user_info: &CachedUserInfo,
) -> bool {
@@ -95,8 +96,8 @@ pub fn can_access_any_namespace(
///
/// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and
/// optional owner checks.
-pub struct ListAccessibleBackupGroups<'a> {
- store: &'a Arc<DataStore>,
+pub struct ListAccessibleBackupGroups<'a, T> {
+ store: &'a Arc<DataStore<T>>,
auth_id: Option<&'a Authid>,
user_info: Arc<CachedUserInfo>,
/// The priv on NS level that allows auth_id trump the owner check
@@ -104,15 +105,15 @@ pub struct ListAccessibleBackupGroups<'a> {
/// The priv that auth_id is required to have on NS level additionally to being owner
owner_and_priv: u64,
/// Contains the intertnal state, group iter and a bool flag for override_owner_priv
- state: Option<(ListGroups, bool)>,
- ns_iter: ListNamespacesRecursive,
+ state: Option<(ListGroups<T>, bool)>,
+ ns_iter: ListNamespacesRecursive<T>,
}
-impl<'a> ListAccessibleBackupGroups<'a> {
+impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> {
// TODO: builder pattern
pub fn new_owned(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
auth_id: Option<&'a Authid>,
@@ -122,7 +123,7 @@ impl<'a> ListAccessibleBackupGroups<'a> {
}
pub fn new_with_privs(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
override_owner_priv: Option<u64>,
@@ -145,8 +146,8 @@ impl<'a> ListAccessibleBackupGroups<'a> {
pub static NS_PRIVS_OK: u64 =
PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT;
-impl Iterator for ListAccessibleBackupGroups<'_> {
- type Item = Result<BackupGroup, Error>;
+impl<T: CanRead> Iterator for ListAccessibleBackupGroups<'_, T> {
+ type Item = Result<BackupGroup<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 3d2cba8a..15c2e9e4 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -15,6 +15,7 @@ use pbs_api_types::{
UPID,
};
use pbs_datastore::backup_info::{BackupDir, BackupGroup, BackupInfo};
+use pbs_datastore::chunk_store::{CanRead, CanWrite};
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{BackupManifest, FileInfo};
use pbs_datastore::{DataBlob, DataStore, StoreProgress};
@@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups;
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
/// already been verified or detected as corrupt.
-pub struct VerifyWorker {
+pub struct VerifyWorker<T> {
worker: Arc<dyn WorkerTaskContext>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
}
-impl VerifyWorker {
+impl<T> VerifyWorker<T> {
/// Creates a new VerifyWorker for a given task worker and datastore.
- pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+ pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore<T>>) -> Self {
Self {
worker,
datastore,
@@ -46,7 +47,7 @@ impl VerifyWorker {
}
}
-fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+fn verify_blob<T: CanRead>(backup_dir: &BackupDir<T>, info: &FileInfo) -> Result<(), Error> {
let blob = backup_dir.load_blob(&info.filename)?;
let raw_size = blob.raw_size();
@@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
}
}
-fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
+fn rename_corrupted_chunk<T: CanWrite>(datastore: Arc<DataStore<T>>, digest: &[u8; 32]) {
let (path, digest_str) = datastore.chunk_path(digest);
let mut counter = 0;
@@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
};
}
-fn verify_index_chunks(
- verify_worker: &VerifyWorker,
+fn verify_index_chunks<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
@@ -238,9 +239,9 @@ fn verify_index_chunks(
Ok(())
}
-fn verify_fixed_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_fixed_index<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -260,9 +261,9 @@ fn verify_fixed_index(
verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
}
-fn verify_dynamic_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_dynamic_index<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -291,9 +292,9 @@ fn verify_dynamic_index(
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_dir(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
@@ -325,9 +326,9 @@ pub fn verify_backup_dir(
}
/// See verify_backup_dir
-pub fn verify_backup_dir_with_lock(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir_with_lock<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
_snap_lock: BackupLockGuard,
@@ -403,9 +404,9 @@ pub fn verify_backup_dir_with_lock(
/// Returns
/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_group(
- verify_worker: &VerifyWorker,
- group: &BackupGroup,
+pub fn verify_backup_group<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ group: &BackupGroup<T>,
progress: &mut StoreProgress,
upid: &UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
@@ -455,8 +456,8 @@ pub fn verify_backup_group(
/// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_all_backups(
- verify_worker: &VerifyWorker,
+pub fn verify_all_backups<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
upid: &UPID,
ns: BackupNamespace,
max_depth: Option<usize>,
@@ -504,7 +505,7 @@ pub fn verify_all_backups(
.filter(|group| {
!(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
})
- .collect::<Vec<BackupGroup>>(),
+ .collect::<Vec<BackupGroup<T>>>(),
Err(err) => {
info!("unable to list backups: {err}");
return Ok(errors);
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 1d4cf37c..bda2f17b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -20,7 +20,8 @@ use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
use proxmox_sys::fs::CreateOptions;
use proxmox_sys::logrotate::LogRotate;
-use pbs_datastore::DataStore;
+use pbs_datastore::chunk_store::Lookup as L;
+use pbs_datastore::{is_garbage_collection_running, DataStore};
use proxmox_rest_server::{
cleanup_old_tasks, cookie_from_header, rotate_task_log_archive, ApiConfig, Redirector,
@@ -265,7 +266,7 @@ async fn run() -> Result<(), Error> {
// to remove references for not configured datastores
command_sock.register_command("datastore-removed".to_string(), |_value| {
- if let Err(err) = DataStore::remove_unused_datastores() {
+ if let Err(err) = DataStore::<L>::remove_unused_datastores() {
log::error!("could not refresh datastores: {err}");
}
Ok(Value::Null)
@@ -274,7 +275,7 @@ async fn run() -> Result<(), Error> {
// clear cache entry for datastore that is in a specific maintenance mode
command_sock.register_command("update-datastore-cache".to_string(), |value| {
if let Some(name) = value.and_then(Value::as_str) {
- if let Err(err) = DataStore::update_datastore_cache(name) {
+ if let Err(err) = DataStore::<L>::update_datastore_cache(name) {
log::error!("could not trigger update datastore cache: {err}");
}
}
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 64835028..c2af6c67 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -4,15 +4,18 @@ use std::sync::Arc;
use tracing::info;
use pbs_api_types::Authid;
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::DataStore;
use proxmox_rest_server::WorkerTask;
use crate::server::{jobstate::Job, send_gc_status};
/// Runs a garbage collection job.
-pub fn do_garbage_collection_job(
+pub fn do_garbage_collection_job<
+ T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static,
+>(
mut job: Job,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
auth_id: &Authid,
schedule: Option<String>,
to_stdout: bool,
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 1c86647a..395aaee4 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -7,6 +7,7 @@ use pbs_api_types::{
print_store_and_ns, Authid, KeepOptions, Operation, PruneJobOptions, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
};
+use pbs_datastore::chunk_store::CanWrite;
use pbs_datastore::prune::compute_prune_info;
use pbs_datastore::DataStore;
use proxmox_rest_server::WorkerTask;
@@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask;
use crate::backup::ListAccessibleBackupGroups;
use crate::server::jobstate::Job;
-pub fn prune_datastore(
+pub fn prune_datastore<T: CanWrite>(
auth_id: Authid,
prune_options: PruneJobOptions,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
dry_run: bool,
) -> Result<(), Error> {
let store = &datastore.name();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b1724c14..573aa805 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
};
use pbs_client::BackupRepository;
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::{CanWrite, Write as W};
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -34,8 +35,8 @@ use super::sync::{
use crate::backup::{check_ns_modification_privs, check_ns_privs};
use crate::tools::parallel_handler::ParallelHandler;
-pub(crate) struct PullTarget {
- store: Arc<DataStore>,
+pub(crate) struct PullTarget<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
}
@@ -44,7 +45,7 @@ pub(crate) struct PullParameters {
/// Where data is pulled from
source: Arc<dyn SyncSource>,
/// Where data should be pulled into
- target: PullTarget,
+ target: PullTarget<W>,
/// Owner of synced groups (needs to match local owner of pre-existing groups)
owner: Authid,
/// Whether to remove groups which exist locally, but not on the remote end
@@ -135,9 +136,9 @@ impl PullParameters {
}
}
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, T: CanWrite + Send + Sync + 'static>(
chunk_reader: Arc<dyn AsyncReadChunk>,
- target: Arc<DataStore>,
+ target: Arc<DataStore<T>>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -260,9 +261,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive<'a>(
+async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>(
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<SyncStats, Error> {
@@ -343,10 +344,10 @@ async fn pull_single_archive<'a>(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot<'a>(
+async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>(
params: &PullParameters,
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt: bool,
is_new: bool,
@@ -482,10 +483,10 @@ async fn pull_snapshot<'a>(
///
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
-async fn pull_snapshot_from<'a>(
+async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>(
params: &PullParameters,
reader: Arc<dyn SyncSourceReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt: bool,
) -> Result<SyncStats, Error> {
diff --git a/src/server/push.rs b/src/server/push.rs
index e71012ed..ff9d9358 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -18,6 +18,7 @@ use pbs_api_types::{
};
use pbs_client::{BackupRepository, BackupWriter, HttpClient, MergedChunkInfo, UploadOptions};
use pbs_config::CachedUserInfo;
+use pbs_datastore::chunk_store::Read as R;
use pbs_datastore::data_blob::ChunkInfo;
use pbs_datastore::dynamic_index::DynamicIndexReader;
use pbs_datastore::fixed_index::FixedIndexReader;
@@ -61,7 +62,7 @@ impl PushTarget {
/// Parameters for a push operation
pub(crate) struct PushParameters {
/// Source of backups to be pushed to remote
- source: Arc<LocalSource>,
+ source: Arc<LocalSource<R>>,
/// Target for backups to be pushed to
target: PushTarget,
/// User used for permission checks on the source side, including potentially filtering visible
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 09814ef0..96a73503 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -24,6 +24,7 @@ use pbs_api_types::{
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::data_blob::DataBlob;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{BackupManifest, DataStore, ListNamespacesRecursive, LocalChunkReader};
@@ -105,10 +106,10 @@ pub(crate) struct RemoteSourceReader {
pub(crate) dir: BackupDir,
}
-pub(crate) struct LocalSourceReader {
+pub(crate) struct LocalSourceReader<T> {
pub(crate) _dir_lock: Arc<Mutex<BackupLockGuard>>,
pub(crate) path: PathBuf,
- pub(crate) datastore: Arc<DataStore>,
+ pub(crate) datastore: Arc<DataStore<T>>,
}
#[async_trait::async_trait]
@@ -189,7 +190,7 @@ impl SyncSourceReader for RemoteSourceReader {
}
#[async_trait::async_trait]
-impl SyncSourceReader for LocalSourceReader {
+impl<T: CanRead + Send + Sync + 'static> SyncSourceReader for LocalSourceReader<T> {
fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
Arc::new(LocalChunkReader::new(
self.datastore.clone(),
@@ -266,8 +267,8 @@ pub(crate) struct RemoteSource {
pub(crate) client: HttpClient,
}
-pub(crate) struct LocalSource {
- pub(crate) store: Arc<DataStore>,
+pub(crate) struct LocalSource<T> {
+ pub(crate) store: Arc<DataStore<T>>,
pub(crate) ns: BackupNamespace,
}
@@ -415,7 +416,7 @@ impl SyncSource for RemoteSource {
}
#[async_trait::async_trait]
-impl SyncSource for LocalSource {
+impl<T: CanRead + Send + Sync + 'static> SyncSource for LocalSource<T> {
async fn list_namespaces(
&self,
max_depth: &mut Option<usize>,
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index 9d11c04b..7f4ef01f 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -5,6 +5,7 @@ use std::task::{Context, Poll};
use proxmox_sys::error::SysError;
use proxmox_uuid::Uuid;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::SnapshotReader;
use pbs_tape::{MediaContentHeader, TapeWrite, PROXMOX_TAPE_BLOCK_SIZE};
@@ -21,9 +22,9 @@ use crate::tape::file_formats::{
/// `LEOM` was detected before all data was written. The stream is
/// marked inclomplete in that case and does not contain all data (The
/// backup task must rewrite the whole file on the next media).
-pub fn tape_write_snapshot_archive<'a>(
+pub fn tape_write_snapshot_archive<'a, T: CanRead>(
writer: &mut (dyn TapeWrite + 'a),
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<Option<Uuid>, std::io::Error> {
let backup_dir = snapshot_reader.snapshot();
let snapshot =
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 54084421..17c20add 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -15,6 +15,7 @@ use tracing::{info, warn};
use proxmox_uuid::Uuid;
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataStore, SnapshotReader};
use pbs_tape::{sg_tape::tape_alert_flags_critical, TapeWrite};
use proxmox_rest_server::WorkerTask;
@@ -452,9 +453,9 @@ impl PoolWriter {
/// archive is marked incomplete, and we do not use it. The caller
/// should mark the media as full and try again using another
/// media.
- pub fn append_snapshot_archive(
+ pub fn append_snapshot_archive<T: CanRead>(
&mut self,
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<(bool, usize), Error> {
let status = match self.status {
Some(ref mut status) => status,
@@ -543,10 +544,10 @@ impl PoolWriter {
Ok((leom, bytes_written))
}
- pub fn spawn_chunk_reader_thread(
+ pub fn spawn_chunk_reader_thread<T: CanRead + Send + Sync + 'static>(
&self,
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
NewChunksIterator::spawn(
datastore,
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index e6f418df..e1a0da20 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
@@ -21,9 +22,9 @@ impl NewChunksIterator {
/// Creates the iterator, spawning a new thread
///
/// Make sure to join() the returned thread handle.
- pub fn spawn(
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ pub fn spawn<T: CanRead + Send + Sync + 'static>(
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
catalog_set: Arc<Mutex<CatalogSet>>,
read_threads: usize,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-05-26 14:15 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-05-26 14:14 [pbs-devel] [PATCH proxmox-backup v2 00/12] introduce typestate for datastore/chunkstore Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 01/12] chunkstore: add CanRead and CanWrite trait Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 02/12] chunkstore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 03/12] datastore: add generics and new lookup functions Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 04/12] datastore: separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 05/12] backup_info: add generics and separate functions into impl blocks Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 06/12] pbs-datastore: " Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 07/12] api: backup: env: add generics and separate functions into impl block Hannes Laimer
2025-05-26 14:14 ` Hannes Laimer [this message]
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 09/12] examples/tests: add missing generics Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 10/12] api: admin: pull datastore loading out of check_privs helper Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 11/12] datastore: move `fn gc_running` out of DataStoreImpl Hannes Laimer
2025-05-26 14:14 ` [pbs-devel] [PATCH proxmox-backup v2 12/12] api/server: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20250526141445.228717-9-h.laimer@proxmox.com \
--to=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.