* [pbs-devel] [PATCH proxmox-backup RFC 01/10] chunkstore: add CanRead and CanWrite trait
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 02/10] chunkstore: separate functions into impl block Hannes Laimer
` (9 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/chunk_store.rs | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
index dd0061ea..2ffd8488 100644
--- a/pbs-datastore/src/chunk_store.rs
+++ b/pbs-datastore/src/chunk_store.rs
@@ -18,14 +18,29 @@ use crate::file_formats::{
};
use crate::DataBlob;
+pub trait CanRead: Clone {}
+pub trait CanWrite: CanRead {}
+
+#[derive(Clone, Copy)]
+pub struct Read;
+#[derive(Clone, Copy)]
+pub struct Write;
+#[derive(Clone, Copy)]
+pub struct Lookup;
+
+impl CanRead for Read {}
+impl CanRead for Write {}
+impl CanWrite for Write {}
+
/// File system based chunk store
-pub struct ChunkStore {
+pub struct ChunkStore<T> {
name: String, // used for error reporting
pub(crate) base: PathBuf,
chunk_dir: PathBuf,
mutex: Mutex<()>,
locker: Option<Arc<Mutex<ProcessLocker>>>,
sync_level: DatastoreFSyncLevel,
+ _marker: std::marker::PhantomData<T>,
}
// TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 02/10] chunkstore: separate functions into impl block
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 01/10] chunkstore: add CanRead and CanWrite trait Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 03/10] datastore: add generics and new lookup functions Hannes Laimer
` (8 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
... based on whether they are reading/writing.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/chunk_store.rs | 211 ++++++++++++++++++-------------
1 file changed, 120 insertions(+), 91 deletions(-)
diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
index 2ffd8488..c9f316ad 100644
--- a/pbs-datastore/src/chunk_store.rs
+++ b/pbs-datastore/src/chunk_store.rs
@@ -78,30 +78,29 @@ fn digest_to_prefix(digest: &[u8]) -> PathBuf {
path.into()
}
-impl ChunkStore {
- #[doc(hidden)]
- pub unsafe fn panic_store() -> Self {
- Self {
- name: String::new(),
- base: PathBuf::new(),
- chunk_dir: PathBuf::new(),
- mutex: Mutex::new(()),
- locker: None,
- sync_level: Default::default(),
- }
- }
+impl ChunkStore<Lookup> {
+ pub fn open_lookup<P: Into<PathBuf>>(name: &str, base: P) -> Result<Self, Error> {
+ let base: PathBuf = base.into();
- fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
- let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
- chunk_dir.push(".chunks");
+ if !base.is_absolute() {
+ bail!("expected absolute path - got {:?}", base);
+ }
- chunk_dir
- }
+ let chunk_dir = Self::chunk_dir(&base);
- pub fn base(&self) -> &Path {
- &self.base
+ Ok(Self {
+ name: name.to_owned(),
+ base,
+ chunk_dir,
+ locker: None,
+ mutex: Mutex::new(()),
+ sync_level: DatastoreFSyncLevel::None,
+ _marker: std::marker::PhantomData,
+ })
}
+}
+impl ChunkStore<Write> {
pub fn create<P>(
name: &str,
path: P,
@@ -164,13 +163,9 @@ impl ChunkStore {
Self::open(name, base, sync_level)
}
+}
- fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
- let mut lockfile_path: PathBuf = base.into();
- lockfile_path.push(".lock");
- lockfile_path
- }
-
+impl<T: CanRead> ChunkStore<T> {
/// Opens the chunk store with a new process locker.
///
/// Note that this must be used with care, as it's dangerous to create two instances on the
@@ -204,62 +199,10 @@ impl ChunkStore {
locker: Some(locker),
mutex: Mutex::new(()),
sync_level,
+ _marker: std::marker::PhantomData,
})
}
- pub fn touch_chunk(&self, digest: &[u8; 32]) -> Result<(), Error> {
- // unwrap: only `None` in unit tests
- assert!(self.locker.is_some());
-
- self.cond_touch_chunk(digest, true)?;
- Ok(())
- }
-
- pub fn cond_touch_chunk(&self, digest: &[u8; 32], assert_exists: bool) -> Result<bool, Error> {
- // unwrap: only `None` in unit tests
- assert!(self.locker.is_some());
-
- let (chunk_path, _digest_str) = self.chunk_path(digest);
- self.cond_touch_path(&chunk_path, assert_exists)
- }
-
- pub fn cond_touch_path(&self, path: &Path, assert_exists: bool) -> Result<bool, Error> {
- // unwrap: only `None` in unit tests
- assert!(self.locker.is_some());
-
- const UTIME_NOW: i64 = (1 << 30) - 1;
- const UTIME_OMIT: i64 = (1 << 30) - 2;
-
- let times: [libc::timespec; 2] = [
- // access time -> update to now
- libc::timespec {
- tv_sec: 0,
- tv_nsec: UTIME_NOW,
- },
- // modification time -> keep as is
- libc::timespec {
- tv_sec: 0,
- tv_nsec: UTIME_OMIT,
- },
- ];
-
- use nix::NixPath;
-
- let res = path.with_nix_path(|cstr| unsafe {
- let tmp = libc::utimensat(-1, cstr.as_ptr(), ×[0], libc::AT_SYMLINK_NOFOLLOW);
- nix::errno::Errno::result(tmp)
- })?;
-
- if let Err(err) = res {
- if !assert_exists && err == nix::errno::Errno::ENOENT {
- return Ok(false);
- }
- bail!("update atime failed for chunk/file {path:?} - {err}");
- }
-
- Ok(true)
- }
-
pub fn get_chunk_iterator(
&self,
) -> Result<
@@ -354,11 +297,74 @@ impl ChunkStore {
})
.fuse())
}
+ pub fn try_shared_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
+ // unwrap: only `None` in unit tests
+ ProcessLocker::try_shared_lock(self.locker.clone().unwrap())
+ }
+ pub fn try_exclusive_lock(&self) -> Result<ProcessLockExclusiveGuard, Error> {
+ // unwrap: only `None` in unit tests
+ ProcessLocker::try_exclusive_lock(self.locker.clone().unwrap())
+ }
pub fn oldest_writer(&self) -> Option<i64> {
// unwrap: only `None` in unit tests
ProcessLocker::oldest_shared_lock(self.locker.clone().unwrap())
}
+}
+
+impl<T: CanWrite> ChunkStore<T> {
+ pub fn touch_chunk(&self, digest: &[u8; 32]) -> Result<(), Error> {
+ // unwrap: only `None` in unit tests
+ assert!(self.locker.is_some());
+
+ self.cond_touch_chunk(digest, true)?;
+ Ok(())
+ }
+
+ pub fn cond_touch_chunk(&self, digest: &[u8; 32], assert_exists: bool) -> Result<bool, Error> {
+ // unwrap: only `None` in unit tests
+ assert!(self.locker.is_some());
+
+ let (chunk_path, _digest_str) = self.chunk_path(digest);
+ self.cond_touch_path(&chunk_path, assert_exists)
+ }
+
+ pub fn cond_touch_path(&self, path: &Path, assert_exists: bool) -> Result<bool, Error> {
+ // unwrap: only `None` in unit tests
+ assert!(self.locker.is_some());
+
+ const UTIME_NOW: i64 = (1 << 30) - 1;
+ const UTIME_OMIT: i64 = (1 << 30) - 2;
+
+ let times: [libc::timespec; 2] = [
+ // access time -> update to now
+ libc::timespec {
+ tv_sec: 0,
+ tv_nsec: UTIME_NOW,
+ },
+ // modification time -> keep as is
+ libc::timespec {
+ tv_sec: 0,
+ tv_nsec: UTIME_OMIT,
+ },
+ ];
+
+ use nix::NixPath;
+
+ let res = path.with_nix_path(|cstr| unsafe {
+ let tmp = libc::utimensat(-1, cstr.as_ptr(), ×[0], libc::AT_SYMLINK_NOFOLLOW);
+ nix::errno::Errno::result(tmp)
+ })?;
+
+ if let Err(err) = res {
+ if !assert_exists && err == nix::errno::Errno::ENOENT {
+ return Ok(false);
+ }
+ bail!("update atime failed for chunk/file {path:?} - {err}");
+ }
+
+ Ok(true)
+ }
pub fn sweep_unused_chunks(
&self,
@@ -534,6 +540,38 @@ impl ChunkStore {
Ok((false, encoded_size))
}
+}
+
+impl<T> ChunkStore<T> {
+ #[doc(hidden)]
+ pub fn dummy_store() -> Self {
+ Self {
+ name: String::new(),
+ base: PathBuf::new(),
+ chunk_dir: PathBuf::new(),
+ mutex: Mutex::new(()),
+ locker: None,
+ sync_level: Default::default(),
+ _marker: std::marker::PhantomData,
+ }
+ }
+
+ fn chunk_dir<P: AsRef<Path>>(path: P) -> PathBuf {
+ let mut chunk_dir: PathBuf = PathBuf::from(path.as_ref());
+ chunk_dir.push(".chunks");
+
+ chunk_dir
+ }
+
+ pub fn base(&self) -> &Path {
+ &self.base
+ }
+
+ fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
+ let mut lockfile_path: PathBuf = base.into();
+ lockfile_path.push(".lock");
+ lockfile_path
+ }
pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
// unwrap: only `None` in unit tests
@@ -566,16 +604,6 @@ impl ChunkStore {
self.base.clone()
}
-
- pub fn try_shared_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
- // unwrap: only `None` in unit tests
- ProcessLocker::try_shared_lock(self.locker.clone().unwrap())
- }
-
- pub fn try_exclusive_lock(&self) -> Result<ProcessLockExclusiveGuard, Error> {
- // unwrap: only `None` in unit tests
- ProcessLocker::try_exclusive_lock(self.locker.clone().unwrap())
- }
}
#[test]
@@ -585,13 +613,14 @@ fn test_chunk_store1() {
if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
- let chunk_store = ChunkStore::open("test", &path, DatastoreFSyncLevel::None);
+ let chunk_store: Result<ChunkStore<Read>, _> =
+ ChunkStore::open("test", &path, DatastoreFSyncLevel::None);
assert!(chunk_store.is_err());
let user = nix::unistd::User::from_uid(nix::unistd::Uid::current())
.unwrap()
.unwrap();
- let chunk_store =
+ let chunk_store: ChunkStore<Write> =
ChunkStore::create("test", &path, user.uid, user.gid, DatastoreFSyncLevel::None).unwrap();
let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8])
@@ -604,7 +633,7 @@ fn test_chunk_store1() {
let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
assert!(exists);
- let chunk_store =
+ let chunk_store: Result<ChunkStore<Write>, _> =
ChunkStore::create("test", &path, user.uid, user.gid, DatastoreFSyncLevel::None);
assert!(chunk_store.is_err());
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 03/10] datastore: add generics and new lookup functions
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 01/10] chunkstore: add CanRead and CanWrite trait Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 02/10] chunkstore: separate functions into impl block Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 04/10] datastore: separate functions into impl block Hannes Laimer
` (7 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/datastore.rs | 83 +++++++++++++++++++++++++++++-----
1 file changed, 71 insertions(+), 12 deletions(-)
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index d0f3c53a..be7767ff 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -6,6 +6,7 @@ use std::sync::{Arc, LazyLock, Mutex};
use anyhow::{bail, format_err, Error};
use nix::unistd::{unlinkat, UnlinkatFlags};
+use pbs_config::BackupLockGuard;
use tracing::{info, warn};
use proxmox_human_byte::HumanByte;
@@ -23,7 +24,9 @@ use pbs_api_types::{
};
use crate::backup_info::{BackupDir, BackupGroup, BackupGroupDeleteStats};
-use crate::chunk_store::ChunkStore;
+use crate::chunk_store::{
+ CanRead, CanWrite, ChunkStore, Lookup, Read as ReadStore, Write as WriteStore,
+};
use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
use crate::hierarchy::{ListGroups, ListGroupsType, ListNamespaces, ListNamespacesRecursive};
@@ -32,7 +35,12 @@ use crate::manifest::ArchiveType;
use crate::task_tracking::{self, update_active_operations};
use crate::DataBlob;
-static DATASTORE_MAP: LazyLock<Mutex<HashMap<String, Arc<DataStoreImpl>>>> =
+type DataStoreCache<T> = HashMap<String, Arc<DataStoreImpl<T>>>;
+
+static DATASTORE_MAP_READ: LazyLock<Mutex<DataStoreCache<ReadStore>>> =
+ LazyLock::new(|| Mutex::new(HashMap::new()));
+
+static DATASTORE_MAP_WRITE: LazyLock<Mutex<DataStoreCache<WriteStore>>> =
LazyLock::new(|| Mutex::new(HashMap::new()));
/// checks if auth_id is owner, or, if owner is a token, if
@@ -50,8 +58,8 @@ pub fn check_backup_owner(owner: &Authid, auth_id: &Authid) -> Result<(), Error>
///
/// A Datastore can store severals backups, and provides the
/// management interface for backup.
-pub struct DataStoreImpl {
- chunk_store: Arc<ChunkStore>,
+pub struct DataStoreImpl<T> {
+ chunk_store: Arc<ChunkStore<T>>,
gc_mutex: Mutex<()>,
last_gc_status: Mutex<GarbageCollectionStatus>,
verify_new: bool,
@@ -60,12 +68,12 @@ pub struct DataStoreImpl {
sync_level: DatastoreFSyncLevel,
}
-impl DataStoreImpl {
+impl<T> DataStoreImpl<T> {
// This one just panics on everything
#[doc(hidden)]
- pub(crate) unsafe fn new_test() -> Arc<Self> {
+ pub(crate) fn new_test() -> Arc<Self> {
Arc::new(Self {
- chunk_store: Arc::new(unsafe { ChunkStore::panic_store() }),
+ chunk_store: Arc::new(ChunkStore::dummy_store()),
gc_mutex: Mutex::new(()),
last_gc_status: Mutex::new(GarbageCollectionStatus::default()),
verify_new: false,
@@ -76,12 +84,12 @@ impl DataStoreImpl {
}
}
-pub struct DataStore {
- inner: Arc<DataStoreImpl>,
+pub struct DataStore<T> {
+ inner: Arc<DataStoreImpl<T>>,
operation: Option<Operation>,
}
-impl Clone for DataStore {
+impl<T> Clone for DataStore<T> {
fn clone(&self) -> Self {
let mut new_operation = self.operation;
if let Some(operation) = self.operation {
@@ -98,7 +106,7 @@ impl Clone for DataStore {
}
}
-impl Drop for DataStore {
+impl<T> Drop for DataStore<T> {
fn drop(&mut self) {
if let Some(operation) = self.operation {
let mut last_task = false;
@@ -120,12 +128,63 @@ impl Drop for DataStore {
});
if remove_from_cache {
- DATASTORE_MAP.lock().unwrap().remove(self.name());
+ DATASTORE_MAP_READ.lock().unwrap().remove(self.name());
+ DATASTORE_MAP_WRITE.lock().unwrap().remove(self.name());
}
}
}
}
+impl DataStore<Lookup> {
+ pub fn lookup_datastore(name: &str) -> Result<Arc<Self>, Error> {
+ let (config, digest, _lock) = Self::read_config(name)?;
+ let chunk_store = Arc::new(ChunkStore::open_lookup(name, &config.path)?);
+ let tuning: DatastoreTuning = serde_json::from_value(
+ DatastoreTuning::API_SCHEMA
+ .parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
+ )?;
+ let store = DataStoreImpl {
+ chunk_store,
+ gc_mutex: Mutex::new(()),
+ last_gc_status: Mutex::new(GarbageCollectionStatus::default()),
+ verify_new: config.verify_new.unwrap_or(false),
+ chunk_order: tuning.chunk_order.unwrap_or_default(),
+ last_digest: Some(digest),
+ sync_level: tuning.sync_level.unwrap_or_default(),
+ };
+
+ Ok(Arc::new(Self {
+ inner: Arc::new(store),
+ operation: Some(Operation::Lookup),
+ }))
+ }
+}
+impl DataStore<ReadStore> {
+ pub fn lookup_datastore_read(name: &str) -> Result<Arc<Self>, Error> {
+ let mut datastore_cache = DATASTORE_MAP_READ.lock().unwrap();
+ let cache_entry = datastore_cache.get(name);
+ let store = Self::open_datastore(name, Some(Operation::Write), cache_entry.cloned())?;
+ if cache_entry.is_none() {
+ datastore_cache.insert(name.to_string(), store.inner.clone());
+ }
+ Ok(store)
+ }
+}
+impl DataStore<WriteStore> {
+ pub fn lookup_datastore_write(name: &str) -> Result<Arc<Self>, Error> {
+ let mut datastore_cache = DATASTORE_MAP_WRITE.lock().unwrap();
+ let cache_entry = datastore_cache.get(name);
+ let store = Self::open_datastore(name, Some(Operation::Write), cache_entry.cloned())?;
+ if cache_entry.is_none() {
+ datastore_cache.insert(name.to_string(), store.inner.clone());
+ }
+ Ok(store)
+ }
+}
+
+ }
+}
+
impl DataStore {
// This one just panics on everything
#[doc(hidden)]
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 04/10] datastore: separate functions into impl block
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (2 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 03/10] datastore: add generics and new lookup functions Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 05/10] backup_info: add generics and separate functions into impl blocks Hannes Laimer
` (6 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
... based on whether they are reading/writing.
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/datastore.rs | 1633 ++++++++++++++++----------------
1 file changed, 813 insertions(+), 820 deletions(-)
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index be7767ff..70b31705 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -180,33 +180,178 @@ impl DataStore<WriteStore> {
}
Ok(store)
}
-}
+ /// Destroy a datastore. This requires that there are no active operations on the datastore.
+ ///
+ /// This is a synchronous operation and should be run in a worker-thread.
+ pub fn destroy(name: &str, destroy_data: bool) -> Result<(), Error> {
+ let config_lock = pbs_config::datastore::lock_config()?;
+
+ let (mut config, _digest) = pbs_config::datastore::config()?;
+ let mut datastore_config: DataStoreConfig = config.lookup("datastore", name)?;
+
+ datastore_config.set_maintenance_mode(Some(MaintenanceMode {
+ ty: MaintenanceType::Delete,
+ message: None,
+ }))?;
+
+ config.set_data(name, "datastore", &datastore_config)?;
+ pbs_config::datastore::save_config(&config)?;
+ drop(config_lock);
+
+ let (operations, _lock) = task_tracking::get_active_operations_locked(name)?;
+
+ if operations.read != 0 || operations.write != 0 {
+ bail!("datastore is currently in use");
+ }
+
+ let base = PathBuf::from(&datastore_config.path);
+
+ let mut ok = true;
+ if destroy_data {
+ let remove = |subdir, ok: &mut bool| {
+ if let Err(err) = std::fs::remove_dir_all(base.join(subdir)) {
+ if err.kind() != io::ErrorKind::NotFound {
+ warn!("failed to remove {subdir:?} subdirectory: {err}");
+ *ok = false;
+ }
+ }
+ };
+
+ info!("Deleting datastore data...");
+ remove("ns", &mut ok); // ns first
+ remove("ct", &mut ok);
+ remove("vm", &mut ok);
+ remove("host", &mut ok);
+
+ if ok {
+ if let Err(err) = std::fs::remove_file(base.join(".gc-status")) {
+ if err.kind() != io::ErrorKind::NotFound {
+ warn!("failed to remove .gc-status file: {err}");
+ ok = false;
+ }
+ }
+ }
+
+ // chunks get removed last and only if the backups were successfully deleted
+ if ok {
+ remove(".chunks", &mut ok);
+ }
+ }
+
+ // now the config
+ if ok {
+ info!("Removing datastore from config...");
+ let _lock = pbs_config::datastore::lock_config()?;
+ let _ = config.sections.remove(name);
+ pbs_config::datastore::save_config(&config)?;
+ }
+
+ // finally the lock & toplevel directory
+ if destroy_data {
+ if ok {
+ if let Err(err) = std::fs::remove_file(base.join(".lock")) {
+ if err.kind() != io::ErrorKind::NotFound {
+ warn!("failed to remove .lock file: {err}");
+ ok = false;
+ }
+ }
+ }
+
+ if ok {
+ info!("Finished deleting data.");
+
+ match std::fs::remove_dir(base) {
+ Ok(()) => info!("Removed empty datastore directory."),
+ Err(err) if err.kind() == io::ErrorKind::NotFound => {
+ // weird, but ok
+ }
+ Err(err) if err.is_errno(nix::errno::Errno::EBUSY) => {
+ warn!("Cannot delete datastore directory (is it a mount point?).")
+ }
+ Err(err) if err.is_errno(nix::errno::Errno::ENOTEMPTY) => {
+ warn!("Datastore directory not empty, not deleting.")
+ }
+ Err(err) => {
+ warn!("Failed to remove datastore directory: {err}");
+ }
+ }
+ } else {
+ info!("There were errors deleting data.");
+ }
+ }
+
+ Ok(())
+ }
+ /// trigger clearing cache entry based on maintenance mode. Entry will only
+ /// be cleared iff there is no other task running, if there is, the end of the
+ /// last running task will trigger the clearing of the cache entry.
+ pub fn update_datastore_cache(name: &str) -> Result<(), Error> {
+ let (config, _digest) = pbs_config::datastore::config()?;
+ let datastore: DataStoreConfig = config.lookup("datastore", name)?;
+ if datastore
+ .get_maintenance_mode()
+ .map_or(false, |m| m.is_offline())
+ {
+ // the datastore drop handler does the checking if tasks are running and clears the
+ // cache entry, so we just have to trigger it here
+ let _ = DataStore::lookup_datastore(name);
+ }
+
+ Ok(())
+ }
+ /// removes all datastores that are not configured anymore
+ pub fn remove_unused_datastores() -> Result<(), Error> {
+ let (config, _digest) = pbs_config::datastore::config()?;
+
+ let mut map_read = DATASTORE_MAP_READ.lock().unwrap();
+ // removes all elements that are not in the config
+ map_read.retain(|key, _| config.sections.contains_key(key));
+
+ let mut map_write = DATASTORE_MAP_WRITE.lock().unwrap();
+ // removes all elements that are not in the config
+ map_write.retain(|key, _| config.sections.contains_key(key));
+ Ok(())
}
}
-impl DataStore {
- // This one just panics on everything
- #[doc(hidden)]
- pub(crate) unsafe fn new_test() -> Arc<Self> {
- Arc::new(Self {
- inner: unsafe { DataStoreImpl::new_test() },
- operation: None,
- })
+impl<T: CanRead + 'static> DataStore<T> {
+ /// Get a streaming iter over top-level backup groups of a datastore of a particular type,
+ /// filtered by `Ok` results
+ ///
+ /// The iterated item's result is already unwrapped, if it contained an error it will be
+ /// logged. Can be useful in iterator chain commands
+ pub fn iter_backup_type_ok(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ty: BackupType,
+ ) -> Result<impl Iterator<Item = BackupGroup<T>> + 'static, Error> {
+ Ok(self.iter_backup_type(ns, ty)?.ok())
+ }
+
+ /// Get a streaming iter over top-level backup groups of a datatstore, filtered by Ok results
+ ///
+ /// The iterated item's result is already unwrapped, if it contained an error it will be
+ /// logged. Can be useful in iterator chain commands
+ pub fn iter_backup_groups_ok(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ) -> Result<impl Iterator<Item = BackupGroup<T>> + 'static, Error> {
+ Ok(self.iter_backup_groups(ns)?.ok())
}
+}
- pub fn lookup_datastore(
+impl<T: CanRead> DataStore<T> {
+ fn open_datastore(
name: &str,
operation: Option<Operation>,
- ) -> Result<Arc<DataStore>, Error> {
+ cache_entry: Option<Arc<DataStoreImpl<T>>>,
+ ) -> Result<Arc<DataStore<T>>, Error> {
// Avoid TOCTOU between checking maintenance mode and updating active operation counter, as
// we use it to decide whether it is okay to delete the datastore.
- let _config_lock = pbs_config::datastore::lock_config()?;
-
// we could use the ConfigVersionCache's generation for staleness detection, but we load
// the config anyway -> just use digest, additional benefit: manual changes get detected
- let (config, digest) = pbs_config::datastore::config()?;
- let config: DataStoreConfig = config.lookup("datastore", name)?;
+ let (config, digest, _lock) = Self::read_config(name)?;
if let Some(maintenance_mode) = config.get_maintenance_mode() {
if let Err(error) = maintenance_mode.check(operation) {
@@ -214,11 +359,8 @@ impl DataStore {
}
}
- let mut datastore_cache = DATASTORE_MAP.lock().unwrap();
- let entry = datastore_cache.get(name);
-
// reuse chunk store so that we keep using the same process locker instance!
- let chunk_store = if let Some(datastore) = &entry {
+ let chunk_store = if let Some(datastore) = &cache_entry {
let last_digest = datastore.last_digest.as_ref();
if let Some(true) = last_digest.map(|last_digest| last_digest == &digest) {
if let Some(operation) = operation {
@@ -235,73 +377,59 @@ impl DataStore {
DatastoreTuning::API_SCHEMA
.parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
)?;
- Arc::new(ChunkStore::open(
+ Arc::new(ChunkStore::<T>::open(
name,
&config.path,
tuning.sync_level.unwrap_or_default(),
)?)
};
- let datastore = DataStore::with_store_and_config(chunk_store, config, Some(digest))?;
-
- let datastore = Arc::new(datastore);
- datastore_cache.insert(name.to_string(), datastore.clone());
+ let datastore = Self::with_store_and_config(chunk_store, config, Some(digest))?;
if let Some(operation) = operation {
update_active_operations(name, operation, 1)?;
}
Ok(Arc::new(Self {
- inner: datastore,
+ inner: datastore.into(),
operation,
}))
}
+ fn with_store_and_config(
+ chunk_store: Arc<ChunkStore<T>>,
+ config: DataStoreConfig,
+ last_digest: Option<[u8; 32]>,
+ ) -> Result<DataStoreImpl<T>, Error> {
+ let mut gc_status_path = chunk_store.base_path();
+ gc_status_path.push(".gc-status");
- /// removes all datastores that are not configured anymore
- pub fn remove_unused_datastores() -> Result<(), Error> {
- let (config, _digest) = pbs_config::datastore::config()?;
-
- let mut map = DATASTORE_MAP.lock().unwrap();
- // removes all elements that are not in the config
- map.retain(|key, _| config.sections.contains_key(key));
- Ok(())
- }
-
- /// trigger clearing cache entry based on maintenance mode. Entry will only
- /// be cleared iff there is no other task running, if there is, the end of the
- /// last running task will trigger the clearing of the cache entry.
- pub fn update_datastore_cache(name: &str) -> Result<(), Error> {
- let (config, _digest) = pbs_config::datastore::config()?;
- let datastore: DataStoreConfig = config.lookup("datastore", name)?;
- if datastore
- .get_maintenance_mode()
- .map_or(false, |m| m.is_offline())
- {
- // the datastore drop handler does the checking if tasks are running and clears the
- // cache entry, so we just have to trigger it here
- let _ = DataStore::lookup_datastore(name, Some(Operation::Lookup));
- }
+ let gc_status = if let Some(state) = file_read_optional_string(gc_status_path)? {
+ match serde_json::from_str(&state) {
+ Ok(state) => state,
+ Err(err) => {
+ log::error!("error reading gc-status: {}", err);
+ GarbageCollectionStatus::default()
+ }
+ }
+ } else {
+ GarbageCollectionStatus::default()
+ };
- Ok(())
- }
+ let tuning: DatastoreTuning = serde_json::from_value(
+ DatastoreTuning::API_SCHEMA
+ .parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
+ )?;
- /// Open a raw database given a name and a path.
- ///
- /// # Safety
- /// See the safety section in `open_from_config`
- pub unsafe fn open_path(
- name: &str,
- path: impl AsRef<Path>,
- operation: Option<Operation>,
- ) -> Result<Arc<Self>, Error> {
- let path = path
- .as_ref()
- .to_str()
- .ok_or_else(|| format_err!("non-utf8 paths not supported"))?
- .to_owned();
- unsafe { Self::open_from_config(DataStoreConfig::new(name.to_owned(), path), operation) }
+ Ok(DataStoreImpl {
+ chunk_store,
+ gc_mutex: Mutex::new(()),
+ last_gc_status: Mutex::new(gc_status),
+ verify_new: config.verify_new.unwrap_or(false),
+ chunk_order: tuning.chunk_order.unwrap_or_default(),
+ last_digest,
+ sync_level: tuning.sync_level.unwrap_or_default(),
+ })
}
-
/// Open a datastore given a raw configuration.
///
/// # Safety
@@ -322,7 +450,7 @@ impl DataStore {
.parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
)?;
let chunk_store =
- ChunkStore::open(&name, &config.path, tuning.sync_level.unwrap_or_default())?;
+ ChunkStore::<T>::open(&name, &config.path, tuning.sync_level.unwrap_or_default())?;
let inner = Arc::new(Self::with_store_and_config(
Arc::new(chunk_store),
config,
@@ -335,88 +463,24 @@ impl DataStore {
Ok(Arc::new(Self { inner, operation }))
}
+ pub fn get_chunk_iterator(
+ &self,
+ ) -> Result<
+ impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
+ Error,
+ > {
+ self.inner.chunk_store.get_chunk_iterator()
+ }
+ pub fn open_fixed_reader<P: AsRef<Path>>(
+ &self,
+ filename: P,
+ ) -> Result<FixedIndexReader, Error> {
+ let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
- fn with_store_and_config(
- chunk_store: Arc<ChunkStore>,
- config: DataStoreConfig,
- last_digest: Option<[u8; 32]>,
- ) -> Result<DataStoreImpl, Error> {
- let mut gc_status_path = chunk_store.base_path();
- gc_status_path.push(".gc-status");
-
- let gc_status = if let Some(state) = file_read_optional_string(gc_status_path)? {
- match serde_json::from_str(&state) {
- Ok(state) => state,
- Err(err) => {
- log::error!("error reading gc-status: {}", err);
- GarbageCollectionStatus::default()
- }
- }
- } else {
- GarbageCollectionStatus::default()
- };
-
- let tuning: DatastoreTuning = serde_json::from_value(
- DatastoreTuning::API_SCHEMA
- .parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
- )?;
-
- Ok(DataStoreImpl {
- chunk_store,
- gc_mutex: Mutex::new(()),
- last_gc_status: Mutex::new(gc_status),
- verify_new: config.verify_new.unwrap_or(false),
- chunk_order: tuning.chunk_order.unwrap_or_default(),
- last_digest,
- sync_level: tuning.sync_level.unwrap_or_default(),
- })
- }
-
- pub fn get_chunk_iterator(
- &self,
- ) -> Result<
- impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
- Error,
- > {
- self.inner.chunk_store.get_chunk_iterator()
- }
-
- pub fn create_fixed_writer<P: AsRef<Path>>(
- &self,
- filename: P,
- size: usize,
- chunk_size: usize,
- ) -> Result<FixedIndexWriter, Error> {
- let index = FixedIndexWriter::create(
- self.inner.chunk_store.clone(),
- filename.as_ref(),
- size,
- chunk_size,
- )?;
-
- Ok(index)
- }
-
- pub fn open_fixed_reader<P: AsRef<Path>>(
- &self,
- filename: P,
- ) -> Result<FixedIndexReader, Error> {
- let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
-
- let index = FixedIndexReader::open(&full_path)?;
+ let index = FixedIndexReader::open(&full_path)?;
Ok(index)
}
-
- pub fn create_dynamic_writer<P: AsRef<Path>>(
- &self,
- filename: P,
- ) -> Result<DynamicIndexWriter, Error> {
- let index = DynamicIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref())?;
-
- Ok(index)
- }
-
pub fn open_dynamic_reader<P: AsRef<Path>>(
&self,
filename: P,
@@ -427,6 +491,22 @@ impl DataStore {
Ok(index)
}
+ /// Open a raw database given a name and a path.
+ ///
+ /// # Safety
+ /// See the safety section in `open_from_config`
+ pub unsafe fn open_path(
+ name: &str,
+ path: impl AsRef<Path>,
+ operation: Option<Operation>,
+ ) -> Result<Arc<Self>, Error> {
+ let path = path
+ .as_ref()
+ .to_str()
+ .ok_or_else(|| format_err!("non-utf8 paths not supported"))?
+ .to_owned();
+ unsafe { Self::open_from_config(DataStoreConfig::new(name.to_owned(), path), operation) }
+ }
pub fn open_index<P>(&self, filename: P) -> Result<Box<dyn IndexFile + Send>, Error>
where
@@ -466,121 +546,443 @@ impl DataStore {
Ok(())
}
-
- pub fn name(&self) -> &str {
- self.inner.chunk_store.name()
+ /// Returns if the given namespace exists on the datastore
+ pub fn namespace_exists(&self, ns: &BackupNamespace) -> bool {
+ let mut path = self.base_path();
+ path.push(ns.path());
+ path.exists()
}
+ /// Returns the time of the last successful backup
+ ///
+ /// Or None if there is no backup in the group (or the group dir does not exist).
+ pub fn last_successful_backup(
+ self: &Arc<Self>,
+ ns: &BackupNamespace,
+ backup_group: &pbs_api_types::BackupGroup,
+ ) -> Result<Option<i64>, Error> {
+ let backup_group = self.backup_group(ns.clone(), backup_group.clone());
- pub fn base_path(&self) -> PathBuf {
- self.inner.chunk_store.base_path()
- }
+ let group_path = backup_group.full_group_path();
- /// Returns the absolute path for a backup namespace on this datastore
- pub fn namespace_path(&self, ns: &BackupNamespace) -> PathBuf {
- let mut path = self.base_path();
- path.reserve(ns.path_len());
- for part in ns.components() {
- path.push("ns");
- path.push(part);
+ if group_path.exists() {
+ backup_group.last_successful_backup()
+ } else {
+ Ok(None)
}
- path
- }
-
- /// Returns the absolute path for a backup_type
- pub fn type_path(&self, ns: &BackupNamespace, backup_type: BackupType) -> PathBuf {
- let mut full_path = self.namespace_path(ns);
- full_path.push(backup_type.to_string());
- full_path
}
-
- /// Returns the absolute path for a backup_group
- pub fn group_path(
+ /// Returns the backup owner.
+ ///
+ /// The backup owner is the entity who first created the backup group.
+ pub fn get_owner(
&self,
ns: &BackupNamespace,
backup_group: &pbs_api_types::BackupGroup,
- ) -> PathBuf {
- let mut full_path = self.namespace_path(ns);
- full_path.push(backup_group.to_string());
- full_path
+ ) -> Result<Authid, Error> {
+ let full_path = self.owner_path(ns, backup_group);
+ let owner = proxmox_sys::fs::file_read_firstline(full_path)?;
+ owner
+ .trim_end() // remove trailing newline
+ .parse()
+ .map_err(|err| format_err!("parsing owner for {backup_group} failed: {err}"))
}
- /// Returns the absolute path for backup_dir
- pub fn snapshot_path(
+ pub fn owns_backup(
&self,
ns: &BackupNamespace,
- backup_dir: &pbs_api_types::BackupDir,
- ) -> PathBuf {
- let mut full_path = self.namespace_path(ns);
- full_path.push(backup_dir.to_string());
- full_path
- }
-
- /// Create a backup namespace.
- pub fn create_namespace(
- self: &Arc<Self>,
- parent: &BackupNamespace,
- name: String,
- ) -> Result<BackupNamespace, Error> {
- if !self.namespace_exists(parent) {
- bail!("cannot create new namespace, parent {parent} doesn't already exists");
- }
-
- // construct ns before mkdir to enforce max-depth and name validity
- let ns = BackupNamespace::from_parent_ns(parent, name)?;
-
- let mut ns_full_path = self.base_path();
- ns_full_path.push(ns.path());
-
- std::fs::create_dir_all(ns_full_path)?;
+ backup_group: &pbs_api_types::BackupGroup,
+ auth_id: &Authid,
+ ) -> Result<bool, Error> {
+ let owner = self.get_owner(ns, backup_group)?;
- Ok(ns)
+ Ok(check_backup_owner(&owner, auth_id).is_ok())
}
-
- /// Returns if the given namespace exists on the datastore
- pub fn namespace_exists(&self, ns: &BackupNamespace) -> bool {
- let mut path = self.base_path();
- path.push(ns.path());
- path.exists()
+ /// Get a streaming iter over single-level backup namespaces of a datatstore
+ ///
+ /// The iterated item is still a Result that can contain errors from rather unexptected FS or
+ /// parsing errors.
+ pub fn iter_backup_ns(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ) -> Result<ListNamespaces<T>, Error> {
+ ListNamespaces::new(Arc::clone(self), ns)
}
- /// Remove all backup groups of a single namespace level but not the namespace itself.
- ///
- /// Does *not* descends into child-namespaces and doesn't remoes the namespace itself either.
+ /// Get a streaming iter over single-level backup namespaces of a datatstore, filtered by Ok
///
- /// Returns true if all the groups were removed, and false if some were protected.
- pub fn remove_namespace_groups(self: &Arc<Self>, ns: &BackupNamespace) -> Result<bool, Error> {
- // FIXME: locking? The single groups/snapshots are already protected, so may not be
- // necessary (depends on what we all allow to do with namespaces)
- log::info!("removing all groups in namespace {}:/{ns}", self.name());
+ /// The iterated item's result is already unwrapped, if it contained an error it will be
+ /// logged. Can be useful in iterator chain commands
+ pub fn iter_backup_ns_ok(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ) -> Result<impl Iterator<Item = BackupNamespace>, Error> {
+ let this = Arc::clone(self);
+ Ok(
+ ListNamespaces::new(Arc::clone(self), ns)?.filter_map(move |ns| match ns {
+ Ok(ns) => Some(ns),
+ Err(err) => {
+ log::error!("list groups error on datastore {} - {}", this.name(), err);
+ None
+ }
+ }),
+ )
+ }
- let mut removed_all_groups = true;
+ /// Get a streaming iter over single-level backup namespaces of a datatstore
+ ///
+ /// The iterated item is still a Result that can contain errors from rather unexptected FS or
+ /// parsing errors.
+ pub fn recursive_iter_backup_ns(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ) -> Result<ListNamespacesRecursive<T>, Error> {
+ ListNamespacesRecursive::new(Arc::clone(self), ns)
+ }
- for group in self.iter_backup_groups(ns.to_owned())? {
- let delete_stats = group?.destroy()?;
- removed_all_groups = removed_all_groups && delete_stats.all_removed();
+ /// Get a streaming iter over single-level backup namespaces of a datatstore, filtered by Ok
+ ///
+ /// The iterated item's result is already unwrapped, if it contained an error it will be
+ /// logged. Can be useful in iterator chain commands
+ pub fn recursive_iter_backup_ns_ok(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ max_depth: Option<usize>,
+ ) -> Result<impl Iterator<Item = BackupNamespace>, Error> {
+ let this = Arc::clone(self);
+ Ok(if let Some(depth) = max_depth {
+ ListNamespacesRecursive::<T>::new_max_depth(Arc::clone(self), ns, depth)?
+ } else {
+ ListNamespacesRecursive::<T>::new(Arc::clone(self), ns)?
}
-
- let base_file = std::fs::File::open(self.base_path())?;
- let base_fd = base_file.as_raw_fd();
- for ty in BackupType::iter() {
- let mut ty_dir = ns.path();
- ty_dir.push(ty.to_string());
- // best effort only, but we probably should log the error
- if let Err(err) = unlinkat(Some(base_fd), &ty_dir, UnlinkatFlags::RemoveDir) {
- if err != nix::errno::Errno::ENOENT {
- log::error!("failed to remove backup type {ty} in {ns} - {err}");
- }
+ .filter_map(move |ns| match ns {
+ Ok(ns) => Some(ns),
+ Err(err) => {
+ log::error!("list groups error on datastore {} - {}", this.name(), err);
+ None
}
- }
+ }))
+ }
- Ok(removed_all_groups)
+ /// Get a streaming iter over top-level backup groups of a datatstore of a particular type.
+ ///
+ /// The iterated item is still a Result that can contain errors from rather unexptected FS or
+ /// parsing errors.
+ pub fn iter_backup_type(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ty: BackupType,
+ ) -> Result<ListGroupsType<T>, Error> {
+ ListGroupsType::new(Arc::clone(self), ns, ty)
}
- /// Remove a complete backup namespace optionally including all it's, and child namespaces',
- /// groups. If `removed_groups` is false this only prunes empty namespaces.
+ /// Get a streaming iter over top-level backup groups of a datatstore
///
- /// Returns true if everything requested, and false if some groups were protected or if some
- /// namespaces weren't empty even though all groups were deleted (race with new backup)
+ /// The iterated item is still a Result that can contain errors from rather unexptected FS or
+ /// parsing errors.
+ pub fn iter_backup_groups(
+ self: &Arc<Self>,
+ ns: BackupNamespace,
+ ) -> Result<ListGroups<T>, Error> {
+ ListGroups::new(Arc::clone(self), ns)
+ }
+
+ /// Get a in-memory vector for all top-level backup groups of a datatstore
+ ///
+ /// NOTE: using the iterator directly is most often more efficient w.r.t. memory usage
+ pub fn list_backup_groups(
+ self: &Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ) -> Result<Vec<BackupGroup<T>>, Error> {
+ ListGroups::new(Arc::clone(self), ns)?.collect()
+ }
+
+ pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
+ let base = self.base_path();
+
+ let mut list = vec![];
+
+ use walkdir::WalkDir;
+
+ let walker = WalkDir::new(base).into_iter();
+
+ // make sure we skip .chunks (and other hidden files to keep it simple)
+ fn is_hidden(entry: &walkdir::DirEntry) -> bool {
+ entry
+ .file_name()
+ .to_str()
+ .map(|s| s.starts_with('.'))
+ .unwrap_or(false)
+ }
+ let handle_entry_err = |err: walkdir::Error| {
+ // first, extract the actual IO error and the affected path
+ let (inner, path) = match (err.io_error(), err.path()) {
+ (None, _) => return Ok(()), // not an IO-error
+ (Some(inner), Some(path)) => (inner, path),
+ (Some(inner), None) => bail!("unexpected error on datastore traversal: {inner}"),
+ };
+ if inner.kind() == io::ErrorKind::PermissionDenied {
+ if err.depth() <= 1 && path.ends_with("lost+found") {
+ // allow skipping of (root-only) ext4 fsck-directory on EPERM ..
+ return Ok(());
+ }
+ // .. but do not ignore EPERM in general, otherwise we might prune too many chunks.
+ // E.g., if users messed up with owner/perms on a rsync
+ bail!("cannot continue garbage-collection safely, permission denied on: {path:?}");
+ } else if inner.kind() == io::ErrorKind::NotFound {
+ log::info!("ignoring vanished file: {path:?}");
+ return Ok(());
+ } else {
+ bail!("unexpected error on datastore traversal: {inner} - {path:?}");
+ }
+ };
+ for entry in walker.filter_entry(|e| !is_hidden(e)) {
+ let path = match entry {
+ Ok(entry) => entry.into_path(),
+ Err(err) => {
+ handle_entry_err(err)?;
+ continue;
+ }
+ };
+ if let Ok(archive_type) = ArchiveType::from_path(&path) {
+ if archive_type == ArchiveType::FixedIndex
+ || archive_type == ArchiveType::DynamicIndex
+ {
+ list.push(path);
+ }
+ }
+ }
+
+ Ok(list)
+ }
+
+ pub fn last_gc_status(&self) -> GarbageCollectionStatus {
+ self.inner.last_gc_status.lock().unwrap().clone()
+ }
+
+ pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
+ self.inner.chunk_store.try_shared_lock()
+ }
+ pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
+ let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
+ std::fs::metadata(chunk_path).map_err(Error::from)
+ }
+
+ pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+ let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
+
+ proxmox_lang::try_block!({
+ let mut file = std::fs::File::open(&chunk_path)?;
+ DataBlob::load_from_reader(&mut file)
+ })
+ .map_err(|err| {
+ format_err!(
+ "store '{}', unable to load chunk '{}' - {}",
+ self.name(),
+ digest_str,
+ err,
+ )
+ })
+ }
+ /// returns a list of chunks sorted by their inode number on disk chunks that couldn't get
+ /// stat'ed are placed at the end of the list
+ pub fn get_chunks_in_order<F, A>(
+ &self,
+ index: &(dyn IndexFile + Send),
+ skip_chunk: F,
+ check_abort: A,
+ ) -> Result<Vec<(usize, u64)>, Error>
+ where
+ F: Fn(&[u8; 32]) -> bool,
+ A: Fn(usize) -> Result<(), Error>,
+ {
+ let index_count = index.index_count();
+ let mut chunk_list = Vec::with_capacity(index_count);
+ use std::os::unix::fs::MetadataExt;
+ for pos in 0..index_count {
+ check_abort(pos)?;
+
+ let info = index.chunk_info(pos).unwrap();
+
+ if skip_chunk(&info.digest) {
+ continue;
+ }
+
+ let ino = match self.inner.chunk_order {
+ ChunkOrder::Inode => {
+ match self.stat_chunk(&info.digest) {
+ Err(_) => u64::MAX, // could not stat, move to end of list
+ Ok(metadata) => metadata.ino(),
+ }
+ }
+ ChunkOrder::None => 0,
+ };
+
+ chunk_list.push((pos, ino));
+ }
+
+ match self.inner.chunk_order {
+ // sorting by inode improves data locality, which makes it lots faster on spinners
+ ChunkOrder::Inode => {
+ chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(ino_b))
+ }
+ ChunkOrder::None => {}
+ }
+
+ Ok(chunk_list)
+ }
+
+ /// Open a backup group from this datastore.
+ pub fn backup_group(
+ self: &Arc<Self>,
+ ns: BackupNamespace,
+ group: pbs_api_types::BackupGroup,
+ ) -> BackupGroup<T> {
+ BackupGroup::new(Arc::clone(self), ns, group)
+ }
+
+ /// Open a backup group from this datastore.
+ pub fn backup_group_from_parts<D>(
+ self: &Arc<Self>,
+ ns: BackupNamespace,
+ ty: BackupType,
+ id: D,
+ ) -> BackupGroup<T>
+ where
+ D: Into<String>,
+ {
+ self.backup_group(ns, (ty, id.into()).into())
+ }
+
+ /*
+ /// Open a backup group from this datastore by backup group path such as `vm/100`.
+ ///
+ /// Convenience method for `store.backup_group(path.parse()?)`
+ pub fn backup_group_from_path(self: &Arc<Self>, path: &str) -> Result<BackupGroup, Error> {
+ todo!("split out the namespace");
+ }
+ */
+
+ /// Open a snapshot (backup directory) from this datastore.
+ pub fn backup_dir(
+ self: &Arc<Self>,
+ ns: BackupNamespace,
+ dir: pbs_api_types::BackupDir,
+ ) -> Result<BackupDir<T>, Error> {
+ BackupDir::with_group(self.backup_group(ns, dir.group), dir.time)
+ }
+
+ /// Open a snapshot (backup directory) from this datastore.
+ pub fn backup_dir_from_parts<D>(
+ self: &Arc<Self>,
+ ns: BackupNamespace,
+ ty: BackupType,
+ id: D,
+ time: i64,
+ ) -> Result<BackupDir<T>, Error>
+ where
+ D: Into<String>,
+ {
+ self.backup_dir(ns, (ty, id.into(), time).into())
+ }
+
+ /// Open a snapshot (backup directory) from this datastore with a cached rfc3339 time string.
+ pub fn backup_dir_with_rfc3339<D: Into<String>>(
+ self: &Arc<Self>,
+ group: BackupGroup<T>,
+ time_string: D,
+ ) -> Result<BackupDir<T>, Error> {
+ BackupDir::with_rfc3339(group, time_string.into())
+ }
+
+ /*
+ /// Open a snapshot (backup directory) from this datastore by a snapshot path.
+ pub fn backup_dir_from_path(self: &Arc<Self>, path: &str) -> Result<BackupDir, Error> {
+ todo!("split out the namespace");
+ }
+ */
+}
+
+impl<T: CanWrite> DataStore<T> {
+ pub fn create_fixed_writer<P: AsRef<Path>>(
+ &self,
+ filename: P,
+ size: usize,
+ chunk_size: usize,
+ ) -> Result<FixedIndexWriter<T>, Error> {
+ let index = FixedIndexWriter::create(
+ self.inner.chunk_store.clone(),
+ filename.as_ref(),
+ size,
+ chunk_size,
+ )?;
+
+ Ok(index)
+ }
+ pub fn create_dynamic_writer<P: AsRef<Path>>(
+ &self,
+ filename: P,
+ ) -> Result<DynamicIndexWriter<T>, Error> {
+ let index = DynamicIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref())?;
+
+ Ok(index)
+ }
+ /// Create a backup namespace.
+ pub fn create_namespace(
+ self: &Arc<Self>,
+ parent: &BackupNamespace,
+ name: String,
+ ) -> Result<BackupNamespace, Error> {
+ if !self.namespace_exists(parent) {
+ bail!("cannot create new namespace, parent {parent} doesn't already exists");
+ }
+
+ // construct ns before mkdir to enforce max-depth and name validity
+ let ns = BackupNamespace::from_parent_ns(parent, name)?;
+
+ let mut ns_full_path = self.base_path();
+ ns_full_path.push(ns.path());
+
+ std::fs::create_dir_all(ns_full_path)?;
+
+ Ok(ns)
+ }
+ /// Remove all backup groups of a single namespace level but not the namespace itself.
+ ///
+ /// Does *not* descends into child-namespaces and doesn't remoes the namespace itself either.
+ ///
+ /// Returns true if all the groups were removed, and false if some were protected.
+ pub fn remove_namespace_groups(self: &Arc<Self>, ns: &BackupNamespace) -> Result<bool, Error> {
+ // FIXME: locking? The single groups/snapshots are already protected, so may not be
+ // necessary (depends on what we all allow to do with namespaces)
+ log::info!("removing all groups in namespace {}:/{ns}", self.name());
+
+ let mut removed_all_groups = true;
+
+ for group in self.iter_backup_groups(ns.to_owned())? {
+ let delete_stats = group?.destroy()?;
+ removed_all_groups = removed_all_groups && delete_stats.all_removed();
+ }
+
+ let base_file = std::fs::File::open(self.base_path())?;
+ let base_fd = base_file.as_raw_fd();
+ for ty in BackupType::iter() {
+ let mut ty_dir = ns.path();
+ ty_dir.push(ty.to_string());
+ // best effort only, but we probably should log the error
+ if let Err(err) = unlinkat(Some(base_fd), &ty_dir, UnlinkatFlags::RemoveDir) {
+ if err != nix::errno::Errno::ENOENT {
+ log::error!("failed to remove backup type {ty} in {ns} - {err}");
+ }
+ }
+ }
+
+ Ok(removed_all_groups)
+ }
+
+ /// Remove a complete backup namespace optionally including all it's, and child namespaces',
+ /// groups. If `removed_groups` is false this only prunes empty namespaces.
+ ///
+ /// Returns true if everything requested, and false if some groups were protected or if some
+ /// namespaces weren't empty even though all groups were deleted (race with new backup)
pub fn remove_namespace_recursive(
self: &Arc<Self>,
ns: &BackupNamespace,
@@ -660,88 +1062,6 @@ impl DataStore {
backup_dir.destroy(force)
}
-
- /// Returns the time of the last successful backup
- ///
- /// Or None if there is no backup in the group (or the group dir does not exist).
- pub fn last_successful_backup(
- self: &Arc<Self>,
- ns: &BackupNamespace,
- backup_group: &pbs_api_types::BackupGroup,
- ) -> Result<Option<i64>, Error> {
- let backup_group = self.backup_group(ns.clone(), backup_group.clone());
-
- let group_path = backup_group.full_group_path();
-
- if group_path.exists() {
- backup_group.last_successful_backup()
- } else {
- Ok(None)
- }
- }
-
- /// Return the path of the 'owner' file.
- fn owner_path(&self, ns: &BackupNamespace, group: &pbs_api_types::BackupGroup) -> PathBuf {
- self.group_path(ns, group).join("owner")
- }
-
- /// Returns the backup owner.
- ///
- /// The backup owner is the entity who first created the backup group.
- pub fn get_owner(
- &self,
- ns: &BackupNamespace,
- backup_group: &pbs_api_types::BackupGroup,
- ) -> Result<Authid, Error> {
- let full_path = self.owner_path(ns, backup_group);
- let owner = proxmox_sys::fs::file_read_firstline(full_path)?;
- owner
- .trim_end() // remove trailing newline
- .parse()
- .map_err(|err| format_err!("parsing owner for {backup_group} failed: {err}"))
- }
-
- pub fn owns_backup(
- &self,
- ns: &BackupNamespace,
- backup_group: &pbs_api_types::BackupGroup,
- auth_id: &Authid,
- ) -> Result<bool, Error> {
- let owner = self.get_owner(ns, backup_group)?;
-
- Ok(check_backup_owner(&owner, auth_id).is_ok())
- }
-
- /// Set the backup owner.
- pub fn set_owner(
- &self,
- ns: &BackupNamespace,
- backup_group: &pbs_api_types::BackupGroup,
- auth_id: &Authid,
- force: bool,
- ) -> Result<(), Error> {
- let path = self.owner_path(ns, backup_group);
-
- let mut open_options = std::fs::OpenOptions::new();
- open_options.write(true);
- open_options.truncate(true);
-
- if force {
- open_options.create(true);
- } else {
- open_options.create_new(true);
- }
-
- let mut file = open_options
- .open(&path)
- .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
-
- writeln!(file, "{}", auth_id)
- .map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
-
- Ok(())
- }
-
/// Create (if it does not already exists) and lock a backup group
///
/// And set the owner to 'userid'. If the group already exists, it returns the
@@ -784,224 +1104,71 @@ impl DataStore {
"another backup is already running",
)?;
let owner = self.get_owner(ns, backup_group)?; // just to be sure
- Ok((owner, guard))
- }
- Err(err) => bail!("unable to create backup group {:?} - {}", full_path, err),
- }
- }
-
- /// Creates a new backup snapshot inside a BackupGroup
- ///
- /// The BackupGroup directory needs to exist.
- pub fn create_locked_backup_dir(
- &self,
- ns: &BackupNamespace,
- backup_dir: &pbs_api_types::BackupDir,
- ) -> Result<(PathBuf, bool, DirLockGuard), Error> {
- let full_path = self.snapshot_path(ns, backup_dir);
- let relative_path = full_path.strip_prefix(self.base_path()).map_err(|err| {
- format_err!(
- "failed to produce correct path for backup {backup_dir} in namespace {ns}: {err}"
- )
- })?;
-
- let lock = || {
- lock_dir_noblock(
- &full_path,
- "snapshot",
- "internal error - tried creating snapshot that's already in use",
- )
- };
-
- match std::fs::create_dir(&full_path) {
- Ok(_) => Ok((relative_path.to_owned(), true, lock()?)),
- Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {
- Ok((relative_path.to_owned(), false, lock()?))
- }
- Err(e) => Err(e.into()),
- }
- }
-
- /// Get a streaming iter over single-level backup namespaces of a datatstore
- ///
- /// The iterated item is still a Result that can contain errors from rather unexptected FS or
- /// parsing errors.
- pub fn iter_backup_ns(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<ListNamespaces, Error> {
- ListNamespaces::new(Arc::clone(self), ns)
- }
-
- /// Get a streaming iter over single-level backup namespaces of a datatstore, filtered by Ok
- ///
- /// The iterated item's result is already unwrapped, if it contained an error it will be
- /// logged. Can be useful in iterator chain commands
- pub fn iter_backup_ns_ok(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<impl Iterator<Item = BackupNamespace> + 'static, Error> {
- let this = Arc::clone(self);
- Ok(
- ListNamespaces::new(Arc::clone(self), ns)?.filter_map(move |ns| match ns {
- Ok(ns) => Some(ns),
- Err(err) => {
- log::error!("list groups error on datastore {} - {}", this.name(), err);
- None
- }
- }),
- )
- }
-
- /// Get a streaming iter over single-level backup namespaces of a datatstore
- ///
- /// The iterated item is still a Result that can contain errors from rather unexptected FS or
- /// parsing errors.
- pub fn recursive_iter_backup_ns(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<ListNamespacesRecursive, Error> {
- ListNamespacesRecursive::new(Arc::clone(self), ns)
- }
-
- /// Get a streaming iter over single-level backup namespaces of a datatstore, filtered by Ok
- ///
- /// The iterated item's result is already unwrapped, if it contained an error it will be
- /// logged. Can be useful in iterator chain commands
- pub fn recursive_iter_backup_ns_ok(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- max_depth: Option<usize>,
- ) -> Result<impl Iterator<Item = BackupNamespace> + 'static, Error> {
- let this = Arc::clone(self);
- Ok(if let Some(depth) = max_depth {
- ListNamespacesRecursive::new_max_depth(Arc::clone(self), ns, depth)?
- } else {
- ListNamespacesRecursive::new(Arc::clone(self), ns)?
- }
- .filter_map(move |ns| match ns {
- Ok(ns) => Some(ns),
- Err(err) => {
- log::error!("list groups error on datastore {} - {}", this.name(), err);
- None
- }
- }))
- }
-
- /// Get a streaming iter over top-level backup groups of a datatstore of a particular type.
- ///
- /// The iterated item is still a Result that can contain errors from rather unexptected FS or
- /// parsing errors.
- pub fn iter_backup_type(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ty: BackupType,
- ) -> Result<ListGroupsType, Error> {
- ListGroupsType::new(Arc::clone(self), ns, ty)
- }
-
- /// Get a streaming iter over top-level backup groups of a datastore of a particular type,
- /// filtered by `Ok` results
- ///
- /// The iterated item's result is already unwrapped, if it contained an error it will be
- /// logged. Can be useful in iterator chain commands
- pub fn iter_backup_type_ok(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ty: BackupType,
- ) -> Result<impl Iterator<Item = BackupGroup> + 'static, Error> {
- Ok(self.iter_backup_type(ns, ty)?.ok())
- }
-
- /// Get a streaming iter over top-level backup groups of a datatstore
- ///
- /// The iterated item is still a Result that can contain errors from rather unexptected FS or
- /// parsing errors.
- pub fn iter_backup_groups(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<ListGroups, Error> {
- ListGroups::new(Arc::clone(self), ns)
+ Ok((owner, guard))
+ }
+ Err(err) => bail!("unable to create backup group {:?} - {}", full_path, err),
+ }
}
- /// Get a streaming iter over top-level backup groups of a datatstore, filtered by Ok results
+ /// Creates a new backup snapshot inside a BackupGroup
///
- /// The iterated item's result is already unwrapped, if it contained an error it will be
- /// logged. Can be useful in iterator chain commands
- pub fn iter_backup_groups_ok(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<impl Iterator<Item = BackupGroup> + 'static, Error> {
- Ok(self.iter_backup_groups(ns)?.ok())
- }
+ /// The BackupGroup directory needs to exist.
+ pub fn create_locked_backup_dir(
+ &self,
+ ns: &BackupNamespace,
+ backup_dir: &pbs_api_types::BackupDir,
+ ) -> Result<(PathBuf, bool, DirLockGuard), Error> {
+ let full_path = self.snapshot_path(ns, backup_dir);
+ let relative_path = full_path.strip_prefix(self.base_path()).map_err(|err| {
+ format_err!(
+ "failed to produce correct path for backup {backup_dir} in namespace {ns}: {err}"
+ )
+ })?;
- /// Get a in-memory vector for all top-level backup groups of a datatstore
- ///
- /// NOTE: using the iterator directly is most often more efficient w.r.t. memory usage
- pub fn list_backup_groups(
- self: &Arc<DataStore>,
- ns: BackupNamespace,
- ) -> Result<Vec<BackupGroup>, Error> {
- ListGroups::new(Arc::clone(self), ns)?.collect()
- }
+ let lock = || {
+ lock_dir_noblock(
+ &full_path,
+ "snapshot",
+ "internal error - tried creating snapshot that's already in use",
+ )
+ };
- pub fn list_images(&self) -> Result<Vec<PathBuf>, Error> {
- let base = self.base_path();
+ match std::fs::create_dir(&full_path) {
+ Ok(_) => Ok((relative_path.to_owned(), true, lock()?)),
+ Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {
+ Ok((relative_path.to_owned(), false, lock()?))
+ }
+ Err(e) => Err(e.into()),
+ }
+ }
+ /// Set the backup owner.
+ pub fn set_owner(
+ &self,
+ ns: &BackupNamespace,
+ backup_group: &pbs_api_types::BackupGroup,
+ auth_id: &Authid,
+ force: bool,
+ ) -> Result<(), Error> {
+ let path = self.owner_path(ns, backup_group);
- let mut list = vec![];
+ let mut open_options = std::fs::OpenOptions::new();
+ open_options.write(true);
+ open_options.truncate(true);
- use walkdir::WalkDir;
+ if force {
+ open_options.create(true);
+ } else {
+ open_options.create_new(true);
+ }
- let walker = WalkDir::new(base).into_iter();
+ let mut file = open_options
+ .open(&path)
+ .map_err(|err| format_err!("unable to create owner file {:?} - {}", path, err))?;
- // make sure we skip .chunks (and other hidden files to keep it simple)
- fn is_hidden(entry: &walkdir::DirEntry) -> bool {
- entry
- .file_name()
- .to_str()
- .map(|s| s.starts_with('.'))
- .unwrap_or(false)
- }
- let handle_entry_err = |err: walkdir::Error| {
- // first, extract the actual IO error and the affected path
- let (inner, path) = match (err.io_error(), err.path()) {
- (None, _) => return Ok(()), // not an IO-error
- (Some(inner), Some(path)) => (inner, path),
- (Some(inner), None) => bail!("unexpected error on datastore traversal: {inner}"),
- };
- if inner.kind() == io::ErrorKind::PermissionDenied {
- if err.depth() <= 1 && path.ends_with("lost+found") {
- // allow skipping of (root-only) ext4 fsck-directory on EPERM ..
- return Ok(());
- }
- // .. but do not ignore EPERM in general, otherwise we might prune too many chunks.
- // E.g., if users messed up with owner/perms on a rsync
- bail!("cannot continue garbage-collection safely, permission denied on: {path:?}");
- } else if inner.kind() == io::ErrorKind::NotFound {
- log::info!("ignoring vanished file: {path:?}");
- return Ok(());
- } else {
- bail!("unexpected error on datastore traversal: {inner} - {path:?}");
- }
- };
- for entry in walker.filter_entry(|e| !is_hidden(e)) {
- let path = match entry {
- Ok(entry) => entry.into_path(),
- Err(err) => {
- handle_entry_err(err)?;
- continue;
- }
- };
- if let Ok(archive_type) = ArchiveType::from_path(&path) {
- if archive_type == ArchiveType::FixedIndex
- || archive_type == ArchiveType::DynamicIndex
- {
- list.push(path);
- }
- }
- }
+ writeln!(file, "{}", auth_id)
+ .map_err(|err| format_err!("unable to write owner file {:?} - {}", path, err))?;
- Ok(list)
+ Ok(())
}
// mark chunks used by ``index`` as used
@@ -1104,14 +1271,6 @@ impl DataStore {
Ok(())
}
- pub fn last_gc_status(&self) -> GarbageCollectionStatus {
- self.inner.last_gc_status.lock().unwrap().clone()
- }
-
- pub fn garbage_collection_running(&self) -> bool {
- self.inner.gc_mutex.try_lock().is_err()
- }
-
pub fn garbage_collection(
&self,
worker: &dyn WorkerTaskContext,
@@ -1194,218 +1353,69 @@ impl DataStore {
if gc_status.disk_chunks > 0 {
let avg_chunk = gc_status.disk_bytes / (gc_status.disk_chunks as u64);
info!("Average chunk size: {}", HumanByte::from(avg_chunk));
- }
-
- if let Ok(serialized) = serde_json::to_string(&gc_status) {
- let mut path = self.base_path();
- path.push(".gc-status");
-
- let backup_user = pbs_config::backup_user()?;
- let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
- // set the correct owner/group/permissions while saving file
- // owner(rw) = backup, group(r)= backup
- let options = CreateOptions::new()
- .perm(mode)
- .owner(backup_user.uid)
- .group(backup_user.gid);
-
- // ignore errors
- let _ = replace_file(path, serialized.as_bytes(), options, false);
- }
-
- *self.inner.last_gc_status.lock().unwrap() = gc_status;
- } else {
- bail!("Start GC failed - (already running/locked)");
- }
-
- Ok(())
- }
-
- pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
- self.inner.chunk_store.try_shared_lock()
- }
-
- pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
- self.inner.chunk_store.chunk_path(digest)
- }
-
- pub fn cond_touch_chunk(&self, digest: &[u8; 32], assert_exists: bool) -> Result<bool, Error> {
- self.inner
- .chunk_store
- .cond_touch_chunk(digest, assert_exists)
- }
-
- pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
- self.inner.chunk_store.insert_chunk(chunk, digest)
- }
-
- pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
- let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
- std::fs::metadata(chunk_path).map_err(Error::from)
- }
-
- pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
- let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
-
- proxmox_lang::try_block!({
- let mut file = std::fs::File::open(&chunk_path)?;
- DataBlob::load_from_reader(&mut file)
- })
- .map_err(|err| {
- format_err!(
- "store '{}', unable to load chunk '{}' - {}",
- self.name(),
- digest_str,
- err,
- )
- })
- }
-
- /// Updates the protection status of the specified snapshot.
- pub fn update_protection(&self, backup_dir: &BackupDir, protection: bool) -> Result<(), Error> {
- let full_path = backup_dir.full_path();
-
- if !full_path.exists() {
- bail!("snapshot {} does not exist!", backup_dir.dir());
- }
-
- let _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?;
-
- let protected_path = backup_dir.protected_file();
- if protection {
- std::fs::File::create(protected_path)
- .map_err(|err| format_err!("could not create protection file: {}", err))?;
- } else if let Err(err) = std::fs::remove_file(protected_path) {
- // ignore error for non-existing file
- if err.kind() != std::io::ErrorKind::NotFound {
- bail!("could not remove protection file: {}", err);
- }
- }
-
- Ok(())
- }
-
- pub fn verify_new(&self) -> bool {
- self.inner.verify_new
- }
-
- /// returns a list of chunks sorted by their inode number on disk chunks that couldn't get
- /// stat'ed are placed at the end of the list
- pub fn get_chunks_in_order<F, A>(
- &self,
- index: &(dyn IndexFile + Send),
- skip_chunk: F,
- check_abort: A,
- ) -> Result<Vec<(usize, u64)>, Error>
- where
- F: Fn(&[u8; 32]) -> bool,
- A: Fn(usize) -> Result<(), Error>,
- {
- let index_count = index.index_count();
- let mut chunk_list = Vec::with_capacity(index_count);
- use std::os::unix::fs::MetadataExt;
- for pos in 0..index_count {
- check_abort(pos)?;
-
- let info = index.chunk_info(pos).unwrap();
-
- if skip_chunk(&info.digest) {
- continue;
- }
-
- let ino = match self.inner.chunk_order {
- ChunkOrder::Inode => {
- match self.stat_chunk(&info.digest) {
- Err(_) => u64::MAX, // could not stat, move to end of list
- Ok(metadata) => metadata.ino(),
- }
- }
- ChunkOrder::None => 0,
- };
+ }
- chunk_list.push((pos, ino));
- }
+ if let Ok(serialized) = serde_json::to_string(&gc_status) {
+ let mut path = self.base_path();
+ path.push(".gc-status");
- match self.inner.chunk_order {
- // sorting by inode improves data locality, which makes it lots faster on spinners
- ChunkOrder::Inode => {
- chunk_list.sort_unstable_by(|(_, ino_a), (_, ino_b)| ino_a.cmp(ino_b))
+ let backup_user = pbs_config::backup_user()?;
+ let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644);
+ // set the correct owner/group/permissions while saving file
+ // owner(rw) = backup, group(r)= backup
+ let options = CreateOptions::new()
+ .perm(mode)
+ .owner(backup_user.uid)
+ .group(backup_user.gid);
+
+ // ignore errors
+ let _ = replace_file(path, serialized.as_bytes(), options, false);
}
- ChunkOrder::None => {}
+
+ *self.inner.last_gc_status.lock().unwrap() = gc_status;
+ } else {
+ bail!("Start GC failed - (already running/locked)");
}
- Ok(chunk_list)
+ Ok(())
}
-
- /// Open a backup group from this datastore.
- pub fn backup_group(
- self: &Arc<Self>,
- ns: BackupNamespace,
- group: pbs_api_types::BackupGroup,
- ) -> BackupGroup {
- BackupGroup::new(Arc::clone(self), ns, group)
+ pub fn cond_touch_chunk(&self, digest: &[u8; 32], assert_exists: bool) -> Result<bool, Error> {
+ self.inner
+ .chunk_store
+ .cond_touch_chunk(digest, assert_exists)
}
- /// Open a backup group from this datastore.
- pub fn backup_group_from_parts<T>(
- self: &Arc<Self>,
- ns: BackupNamespace,
- ty: BackupType,
- id: T,
- ) -> BackupGroup
- where
- T: Into<String>,
- {
- self.backup_group(ns, (ty, id.into()).into())
+ pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
+ self.inner.chunk_store.insert_chunk(chunk, digest)
}
- /*
- /// Open a backup group from this datastore by backup group path such as `vm/100`.
- ///
- /// Convenience method for `store.backup_group(path.parse()?)`
- pub fn backup_group_from_path(self: &Arc<Self>, path: &str) -> Result<BackupGroup, Error> {
- todo!("split out the namespace");
- }
- */
+ /// Updates the protection status of the specified snapshot.
+ pub fn update_protection(
+ &self,
+ backup_dir: &BackupDir<T>,
+ protection: bool,
+ ) -> Result<(), Error> {
+ let full_path = backup_dir.full_path();
- /// Open a snapshot (backup directory) from this datastore.
- pub fn backup_dir(
- self: &Arc<Self>,
- ns: BackupNamespace,
- dir: pbs_api_types::BackupDir,
- ) -> Result<BackupDir, Error> {
- BackupDir::with_group(self.backup_group(ns, dir.group), dir.time)
- }
+ if !full_path.exists() {
+ bail!("snapshot {} does not exist!", backup_dir.dir());
+ }
- /// Open a snapshot (backup directory) from this datastore.
- pub fn backup_dir_from_parts<T>(
- self: &Arc<Self>,
- ns: BackupNamespace,
- ty: BackupType,
- id: T,
- time: i64,
- ) -> Result<BackupDir, Error>
- where
- T: Into<String>,
- {
- self.backup_dir(ns, (ty, id.into(), time).into())
- }
+ let _guard = lock_dir_noblock(&full_path, "snapshot", "possibly running or in use")?;
- /// Open a snapshot (backup directory) from this datastore with a cached rfc3339 time string.
- pub fn backup_dir_with_rfc3339<T: Into<String>>(
- self: &Arc<Self>,
- group: BackupGroup,
- time_string: T,
- ) -> Result<BackupDir, Error> {
- BackupDir::with_rfc3339(group, time_string.into())
- }
+ let protected_path = backup_dir.protected_file();
+ if protection {
+ std::fs::File::create(protected_path)
+ .map_err(|err| format_err!("could not create protection file: {}", err))?;
+ } else if let Err(err) = std::fs::remove_file(protected_path) {
+ // ignore error for non-existing file
+ if err.kind() != std::io::ErrorKind::NotFound {
+ bail!("could not remove protection file: {}", err);
+ }
+ }
- /*
- /// Open a snapshot (backup directory) from this datastore by a snapshot path.
- pub fn backup_dir_from_path(self: &Arc<Self>, path: &str) -> Result<BackupDir, Error> {
- todo!("split out the namespace");
+ Ok(())
}
- */
/// Syncs the filesystem of the datastore if 'sync_level' is set to
/// [`DatastoreFSyncLevel::Filesystem`]. Uses syncfs(2).
@@ -1421,107 +1431,90 @@ impl DataStore {
}
Ok(())
}
+}
+impl<T> DataStore<T> {
+ #[doc(hidden)]
+ pub(crate) fn new_test() -> Arc<Self> {
+ Arc::new(Self {
+ inner: DataStoreImpl::new_test(),
+ operation: None,
+ })
+ }
- /// Destroy a datastore. This requires that there are no active operations on the datastore.
- ///
- /// This is a synchronous operation and should be run in a worker-thread.
- pub fn destroy(name: &str, destroy_data: bool) -> Result<(), Error> {
- let config_lock = pbs_config::datastore::lock_config()?;
-
- let (mut config, _digest) = pbs_config::datastore::config()?;
- let mut datastore_config: DataStoreConfig = config.lookup("datastore", name)?;
+ pub fn read_config(name: &str) -> Result<(DataStoreConfig, [u8; 32], BackupLockGuard), Error> {
+ // Avoid TOCTOU between checking maintenance mode and updating active operation counter, as
+ // we use it to decide whether it is okay to delete the datastore.
+ let lock = pbs_config::datastore::lock_config()?;
- datastore_config.set_maintenance_mode(Some(MaintenanceMode {
- ty: MaintenanceType::Delete,
- message: None,
- }))?;
+ // we could use the ConfigVersionCache's generation for staleness detection, but we load
+ // the config anyway -> just use digest, additional benefit: manual changes get detected
+ let (config, digest) = pbs_config::datastore::config()?;
+ let config: DataStoreConfig = config.lookup("datastore", name)?;
+ Ok((config, digest, lock))
+ }
- config.set_data(name, "datastore", &datastore_config)?;
- pbs_config::datastore::save_config(&config)?;
- drop(config_lock);
+ pub fn name(&self) -> &str {
+ self.inner.chunk_store.name()
+ }
- let (operations, _lock) = task_tracking::get_active_operations_locked(name)?;
+ pub fn base_path(&self) -> PathBuf {
+ self.inner.chunk_store.base_path()
+ }
- if operations.read != 0 || operations.write != 0 {
- bail!("datastore is currently in use");
+ /// Returns the absolute path for a backup namespace on this datastore
+ pub fn namespace_path(&self, ns: &BackupNamespace) -> PathBuf {
+ let mut path = self.base_path();
+ path.reserve(ns.path_len());
+ for part in ns.components() {
+ path.push("ns");
+ path.push(part);
}
+ path
+ }
- let base = PathBuf::from(&datastore_config.path);
-
- let mut ok = true;
- if destroy_data {
- let remove = |subdir, ok: &mut bool| {
- if let Err(err) = std::fs::remove_dir_all(base.join(subdir)) {
- if err.kind() != io::ErrorKind::NotFound {
- warn!("failed to remove {subdir:?} subdirectory: {err}");
- *ok = false;
- }
- }
- };
-
- info!("Deleting datastore data...");
- remove("ns", &mut ok); // ns first
- remove("ct", &mut ok);
- remove("vm", &mut ok);
- remove("host", &mut ok);
-
- if ok {
- if let Err(err) = std::fs::remove_file(base.join(".gc-status")) {
- if err.kind() != io::ErrorKind::NotFound {
- warn!("failed to remove .gc-status file: {err}");
- ok = false;
- }
- }
- }
+ /// Returns the absolute path for a backup_type
+ pub fn type_path(&self, ns: &BackupNamespace, backup_type: BackupType) -> PathBuf {
+ let mut full_path = self.namespace_path(ns);
+ full_path.push(backup_type.to_string());
+ full_path
+ }
- // chunks get removed last and only if the backups were successfully deleted
- if ok {
- remove(".chunks", &mut ok);
- }
- }
+ /// Returns the absolute path for a backup_group
+ pub fn group_path(
+ &self,
+ ns: &BackupNamespace,
+ backup_group: &pbs_api_types::BackupGroup,
+ ) -> PathBuf {
+ let mut full_path = self.namespace_path(ns);
+ full_path.push(backup_group.to_string());
+ full_path
+ }
- // now the config
- if ok {
- info!("Removing datastore from config...");
- let _lock = pbs_config::datastore::lock_config()?;
- let _ = config.sections.remove(name);
- pbs_config::datastore::save_config(&config)?;
- }
+ /// Returns the absolute path for backup_dir
+ pub fn snapshot_path(
+ &self,
+ ns: &BackupNamespace,
+ backup_dir: &pbs_api_types::BackupDir,
+ ) -> PathBuf {
+ let mut full_path = self.namespace_path(ns);
+ full_path.push(backup_dir.to_string());
+ full_path
+ }
- // finally the lock & toplevel directory
- if destroy_data {
- if ok {
- if let Err(err) = std::fs::remove_file(base.join(".lock")) {
- if err.kind() != io::ErrorKind::NotFound {
- warn!("failed to remove .lock file: {err}");
- ok = false;
- }
- }
- }
+ /// Return the path of the 'owner' file.
+ fn owner_path(&self, ns: &BackupNamespace, group: &pbs_api_types::BackupGroup) -> PathBuf {
+ self.group_path(ns, group).join("owner")
+ }
- if ok {
- info!("Finished deleting data.");
+ pub fn chunk_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
+ self.inner.chunk_store.chunk_path(digest)
+ }
- match std::fs::remove_dir(base) {
- Ok(()) => info!("Removed empty datastore directory."),
- Err(err) if err.kind() == io::ErrorKind::NotFound => {
- // weird, but ok
- }
- Err(err) if err.is_errno(nix::errno::Errno::EBUSY) => {
- warn!("Cannot delete datastore directory (is it a mount point?).")
- }
- Err(err) if err.is_errno(nix::errno::Errno::ENOTEMPTY) => {
- warn!("Datastore directory not empty, not deleting.")
- }
- Err(err) => {
- warn!("Failed to remove datastore directory: {err}");
- }
- }
- } else {
- info!("There were errors deleting data.");
- }
- }
+ pub fn verify_new(&self) -> bool {
+ self.inner.verify_new
+ }
- Ok(())
+ pub fn garbage_collection_running(&self) -> bool {
+ self.inner.gc_mutex.try_lock().is_err()
}
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 05/10] backup_info: add generics and separate functions into impl blocks
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (3 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 04/10] datastore: separate functions into impl block Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 06/10] pbs-datastore: " Hannes Laimer
` (5 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/backup_info.rs | 179 ++++++++++++++++---------------
1 file changed, 93 insertions(+), 86 deletions(-)
diff --git a/pbs-datastore/src/backup_info.rs b/pbs-datastore/src/backup_info.rs
index 414ec878..d353f9d6 100644
--- a/pbs-datastore/src/backup_info.rs
+++ b/pbs-datastore/src/backup_info.rs
@@ -12,6 +12,7 @@ use pbs_api_types::{
};
use pbs_config::{open_backup_lockfile, BackupLockGuard};
+use crate::chunk_store::{CanRead, CanWrite};
use crate::manifest::{
BackupManifest, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, MANIFEST_LOCK_NAME,
};
@@ -49,14 +50,14 @@ impl BackupGroupDeleteStats {
/// BackupGroup is a directory containing a list of BackupDir
#[derive(Clone)]
-pub struct BackupGroup {
- store: Arc<DataStore>,
+pub struct BackupGroup<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
group: pbs_api_types::BackupGroup,
}
-impl fmt::Debug for BackupGroup {
+impl<T: CanRead> fmt::Debug for BackupGroup<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BackupGroup")
.field("store", &self.store.name())
@@ -66,9 +67,9 @@ impl fmt::Debug for BackupGroup {
}
}
-impl BackupGroup {
+impl<T: Clone> BackupGroup<T> {
pub(crate) fn new(
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
group: pbs_api_types::BackupGroup,
) -> Self {
@@ -106,14 +107,30 @@ impl BackupGroup {
path.push(&self.group.id);
path
}
+ pub fn matches(&self, filter: &GroupFilter) -> bool {
+ self.group.matches(filter)
+ }
+
+ pub fn backup_dir(&self, time: i64) -> Result<BackupDir<T>, Error> {
+ BackupDir::with_group(self.clone(), time)
+ }
+
+ pub fn backup_dir_with_rfc3339<D: Into<String>>(
+ &self,
+ time_string: D,
+ ) -> Result<BackupDir<T>, Error> {
+ BackupDir::with_rfc3339(self.clone(), time_string.into())
+ }
+}
+impl<T: CanRead> BackupGroup<T> {
/// Simple check whether a group exists. This does not check whether there are any snapshots,
/// but rather it simply checks whether the directory exists.
pub fn exists(&self) -> bool {
self.full_group_path().exists()
}
- pub fn list_backups(&self) -> Result<Vec<BackupInfo>, Error> {
+ pub fn list_backups(&self) -> Result<Vec<BackupInfo<T>>, Error> {
let mut list = vec![];
let path = self.full_group_path();
@@ -145,7 +162,7 @@ impl BackupGroup {
}
/// Finds the latest backup inside a backup group
- pub fn last_backup(&self, only_finished: bool) -> Result<Option<BackupInfo>, Error> {
+ pub fn last_backup(&self, only_finished: bool) -> Result<Option<BackupInfo<T>>, Error> {
let backups = self.list_backups()?;
Ok(backups
.into_iter()
@@ -206,23 +223,23 @@ impl BackupGroup {
Ok(last)
}
- pub fn matches(&self, filter: &GroupFilter) -> bool {
- self.group.matches(filter)
- }
-
- pub fn backup_dir(&self, time: i64) -> Result<BackupDir, Error> {
- BackupDir::with_group(self.clone(), time)
+ pub fn iter_snapshots(&self) -> Result<crate::ListSnapshots<T>, Error> {
+ crate::ListSnapshots::new(self.clone())
}
- pub fn backup_dir_with_rfc3339<T: Into<String>>(
- &self,
- time_string: T,
- ) -> Result<BackupDir, Error> {
- BackupDir::with_rfc3339(self.clone(), time_string.into())
+ /// Returns the backup owner.
+ ///
+ /// The backup owner is the entity who first created the backup group.
+ pub fn get_owner(&self) -> Result<Authid, Error> {
+ self.store.get_owner(&self.ns, self.as_ref())
}
+}
- pub fn iter_snapshots(&self) -> Result<crate::ListSnapshots, Error> {
- crate::ListSnapshots::new(self.clone())
+impl<T: CanWrite> BackupGroup<T> {
+ /// Set the backup owner.
+ pub fn set_owner(&self, auth_id: &Authid, force: bool) -> Result<(), Error> {
+ self.store
+ .set_owner(&self.ns, self.as_ref(), auth_id, force)
}
/// Destroy the group inclusive all its backup snapshots (BackupDir's)
@@ -254,49 +271,36 @@ impl BackupGroup {
Ok(delete_stats)
}
-
- /// Returns the backup owner.
- ///
- /// The backup owner is the entity who first created the backup group.
- pub fn get_owner(&self) -> Result<Authid, Error> {
- self.store.get_owner(&self.ns, self.as_ref())
- }
-
- /// Set the backup owner.
- pub fn set_owner(&self, auth_id: &Authid, force: bool) -> Result<(), Error> {
- self.store
- .set_owner(&self.ns, self.as_ref(), auth_id, force)
- }
}
-impl AsRef<pbs_api_types::BackupNamespace> for BackupGroup {
+impl<T> AsRef<pbs_api_types::BackupNamespace> for BackupGroup<T> {
#[inline]
fn as_ref(&self) -> &pbs_api_types::BackupNamespace {
&self.ns
}
}
-impl AsRef<pbs_api_types::BackupGroup> for BackupGroup {
+impl<T> AsRef<pbs_api_types::BackupGroup> for BackupGroup<T> {
#[inline]
fn as_ref(&self) -> &pbs_api_types::BackupGroup {
&self.group
}
}
-impl From<&BackupGroup> for pbs_api_types::BackupGroup {
- fn from(group: &BackupGroup) -> pbs_api_types::BackupGroup {
+impl<T> From<&BackupGroup<T>> for pbs_api_types::BackupGroup {
+ fn from(group: &BackupGroup<T>) -> pbs_api_types::BackupGroup {
group.group.clone()
}
}
-impl From<BackupGroup> for pbs_api_types::BackupGroup {
- fn from(group: BackupGroup) -> pbs_api_types::BackupGroup {
+impl<T> From<BackupGroup<T>> for pbs_api_types::BackupGroup {
+ fn from(group: BackupGroup<T>) -> pbs_api_types::BackupGroup {
group.group
}
}
-impl From<BackupDir> for BackupGroup {
- fn from(dir: BackupDir) -> BackupGroup {
+impl<T> From<BackupDir<T>> for BackupGroup<T> {
+ fn from(dir: BackupDir<T>) -> BackupGroup<T> {
BackupGroup {
store: dir.store,
ns: dir.ns,
@@ -305,8 +309,8 @@ impl From<BackupDir> for BackupGroup {
}
}
-impl From<&BackupDir> for BackupGroup {
- fn from(dir: &BackupDir) -> BackupGroup {
+impl<T: CanRead> From<&BackupDir<T>> for BackupGroup<T> {
+ fn from(dir: &BackupDir<T>) -> BackupGroup<T> {
BackupGroup {
store: Arc::clone(&dir.store),
ns: dir.ns.clone(),
@@ -319,15 +323,15 @@ impl From<&BackupDir> for BackupGroup {
///
/// We also call this a backup snaphost.
#[derive(Clone)]
-pub struct BackupDir {
- store: Arc<DataStore>,
+pub struct BackupDir<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
dir: pbs_api_types::BackupDir,
// backup_time as rfc3339
backup_time_string: String,
}
-impl fmt::Debug for BackupDir {
+impl<T> fmt::Debug for BackupDir<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BackupDir")
.field("store", &self.store.name())
@@ -338,19 +342,21 @@ impl fmt::Debug for BackupDir {
}
}
-impl BackupDir {
+impl<T> BackupDir<T> {
/// Temporarily used for tests.
#[doc(hidden)]
pub fn new_test(dir: pbs_api_types::BackupDir) -> Self {
Self {
- store: unsafe { DataStore::new_test() },
+ store: DataStore::new_test(),
backup_time_string: Self::backup_time_to_string(dir.time).unwrap(),
ns: BackupNamespace::root(),
dir,
}
}
+}
- pub(crate) fn with_group(group: BackupGroup, backup_time: i64) -> Result<Self, Error> {
+impl<T> BackupDir<T> {
+ pub(crate) fn with_group(group: BackupGroup<T>, backup_time: i64) -> Result<Self, Error> {
let backup_time_string = Self::backup_time_to_string(backup_time)?;
Ok(Self {
store: group.store,
@@ -361,7 +367,7 @@ impl BackupDir {
}
pub(crate) fn with_rfc3339(
- group: BackupGroup,
+ group: BackupGroup<T>,
backup_time_string: String,
) -> Result<Self, Error> {
let backup_time = proxmox_time::parse_rfc3339(&backup_time_string)?;
@@ -436,18 +442,6 @@ impl BackupDir {
proxmox_time::epoch_to_rfc3339_utc(backup_time)
}
- /// load a `DataBlob` from this snapshot's backup dir.
- pub fn load_blob(&self, filename: &str) -> Result<DataBlob, Error> {
- let mut path = self.full_path();
- path.push(filename);
-
- proxmox_lang::try_block!({
- let mut file = std::fs::File::open(&path)?;
- DataBlob::load_from_reader(&mut file)
- })
- .map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err))
- }
-
/// Returns the filename to lock a manifest
///
/// Also creates the basedir. The lockfile is located in
@@ -502,19 +496,25 @@ impl BackupDir {
}
/// Get the datastore.
- pub fn datastore(&self) -> &Arc<DataStore> {
+ pub fn datastore(&self) -> &Arc<DataStore<T>> {
&self.store
}
+}
+impl<T: CanRead> BackupDir<T> {
+ /// load a `DataBlob` from this snapshot's backup dir.
+ pub fn load_blob(&self, filename: &str) -> Result<DataBlob, Error> {
+ let mut path = self.full_path();
+ path.push(filename);
- /// Returns the backup owner.
- ///
- /// The backup owner is the entity who first created the backup group.
- pub fn get_owner(&self) -> Result<Authid, Error> {
- self.store.get_owner(&self.ns, self.as_ref())
+ proxmox_lang::try_block!({
+ let mut file = std::fs::File::open(&path)?;
+ DataBlob::load_from_reader(&mut file)
+ })
+ .map_err(|err| format_err!("unable to load blob '{:?}' - {}", path, err))
}
/// Lock the snapshot and open a reader.
- pub fn locked_reader(&self) -> Result<crate::SnapshotReader, Error> {
+ pub fn locked_reader(&self) -> Result<crate::SnapshotReader<T>, Error> {
crate::SnapshotReader::new_do(self.clone())
}
@@ -525,7 +525,15 @@ impl BackupDir {
let manifest = BackupManifest::try_from(blob)?;
Ok((manifest, raw_size))
}
+ /// Returns the backup owner.
+ ///
+ /// The backup owner is the entity who first created the backup group.
+ pub fn get_owner(&self) -> Result<Authid, Error> {
+ self.store.get_owner(&self.ns, self.as_ref())
+ }
+}
+impl<T: CanWrite> BackupDir<T> {
/// Update the manifest of the specified snapshot. Never write a manifest directly,
/// only use this method - anything else may break locking guarantees.
pub fn update_manifest(
@@ -584,62 +592,61 @@ impl BackupDir {
Ok(())
}
}
-
-impl AsRef<pbs_api_types::BackupNamespace> for BackupDir {
+impl<T> AsRef<pbs_api_types::BackupNamespace> for BackupDir<T> {
fn as_ref(&self) -> &pbs_api_types::BackupNamespace {
&self.ns
}
}
-impl AsRef<pbs_api_types::BackupDir> for BackupDir {
+impl<T> AsRef<pbs_api_types::BackupDir> for BackupDir<T> {
fn as_ref(&self) -> &pbs_api_types::BackupDir {
&self.dir
}
}
-impl AsRef<pbs_api_types::BackupGroup> for BackupDir {
+impl<T> AsRef<pbs_api_types::BackupGroup> for BackupDir<T> {
fn as_ref(&self) -> &pbs_api_types::BackupGroup {
&self.dir.group
}
}
-impl From<&BackupDir> for pbs_api_types::BackupGroup {
- fn from(dir: &BackupDir) -> pbs_api_types::BackupGroup {
+impl<T> From<&BackupDir<T>> for pbs_api_types::BackupGroup {
+ fn from(dir: &BackupDir<T>) -> pbs_api_types::BackupGroup {
dir.dir.group.clone()
}
}
-impl From<BackupDir> for pbs_api_types::BackupGroup {
- fn from(dir: BackupDir) -> pbs_api_types::BackupGroup {
+impl<T> From<BackupDir<T>> for pbs_api_types::BackupGroup {
+ fn from(dir: BackupDir<T>) -> pbs_api_types::BackupGroup {
dir.dir.group
}
}
-impl From<&BackupDir> for pbs_api_types::BackupDir {
- fn from(dir: &BackupDir) -> pbs_api_types::BackupDir {
+impl<T> From<&BackupDir<T>> for pbs_api_types::BackupDir {
+ fn from(dir: &BackupDir<T>) -> pbs_api_types::BackupDir {
dir.dir.clone()
}
}
-impl From<BackupDir> for pbs_api_types::BackupDir {
- fn from(dir: BackupDir) -> pbs_api_types::BackupDir {
+impl<T> From<BackupDir<T>> for pbs_api_types::BackupDir {
+ fn from(dir: BackupDir<T>) -> pbs_api_types::BackupDir {
dir.dir
}
}
/// Detailed Backup Information, lists files inside a BackupDir
#[derive(Clone, Debug)]
-pub struct BackupInfo {
+pub struct BackupInfo<T> {
/// the backup directory
- pub backup_dir: BackupDir,
+ pub backup_dir: BackupDir<T>,
/// List of data files
pub files: Vec<String>,
/// Protection Status
pub protected: bool,
}
-impl BackupInfo {
- pub fn new(backup_dir: BackupDir) -> Result<BackupInfo, Error> {
+impl<T: CanRead> BackupInfo<T> {
+ pub fn new(backup_dir: BackupDir<T>) -> Result<Self, Error> {
let path = backup_dir.full_path();
let files = list_backup_files(libc::AT_FDCWD, &path)?;
@@ -652,7 +659,7 @@ impl BackupInfo {
})
}
- pub fn sort_list(list: &mut [BackupInfo], ascendending: bool) {
+ pub fn sort_list(list: &mut [BackupInfo<T>], ascendending: bool) {
if ascendending {
// oldest first
list.sort_unstable_by(|a, b| a.backup_dir.dir.time.cmp(&b.backup_dir.dir.time));
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 06/10] pbs-datastore: add generics and separate functions into impl blocks
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (4 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 05/10] backup_info: add generics and separate functions into impl blocks Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 07/10] api: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer
` (4 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
pbs-datastore/src/dynamic_index.rs | 22 +++---
pbs-datastore/src/fixed_index.rs | 50 +++++++-------
pbs-datastore/src/hierarchy.rs | 92 ++++++++++++++-----------
pbs-datastore/src/local_chunk_reader.rs | 13 ++--
pbs-datastore/src/prune.rs | 19 +++--
pbs-datastore/src/snapshot_reader.rs | 30 ++++----
6 files changed, 121 insertions(+), 105 deletions(-)
diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs
index 0e99ce58..75f6edff 100644
--- a/pbs-datastore/src/dynamic_index.rs
+++ b/pbs-datastore/src/dynamic_index.rs
@@ -18,7 +18,7 @@ use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use pbs_tools::lru_cache::LruCache;
use crate::chunk_stat::ChunkStat;
-use crate::chunk_store::ChunkStore;
+use crate::chunk_store::{CanWrite, ChunkStore};
use crate::data_blob::{DataBlob, DataChunkBuilder};
use crate::file_formats;
use crate::index::{ChunkReadInfo, IndexFile};
@@ -275,8 +275,8 @@ impl IndexFile for DynamicIndexReader {
}
/// Create dynamic index files (`.dixd`)
-pub struct DynamicIndexWriter {
- store: Arc<ChunkStore>,
+pub struct DynamicIndexWriter<T> {
+ store: Arc<ChunkStore<T>>,
_lock: ProcessLockSharedGuard,
writer: BufWriter<File>,
closed: bool,
@@ -287,14 +287,14 @@ pub struct DynamicIndexWriter {
pub ctime: i64,
}
-impl Drop for DynamicIndexWriter {
+impl<T> Drop for DynamicIndexWriter<T> {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
}
}
-impl DynamicIndexWriter {
- pub fn create(store: Arc<ChunkStore>, path: &Path) -> Result<Self, Error> {
+impl<T: CanWrite> DynamicIndexWriter<T> {
+ pub fn create(store: Arc<ChunkStore<T>>, path: &Path) -> Result<Self, Error> {
let shared_lock = store.try_shared_lock()?;
let full_path = store.relative_path(path);
@@ -394,8 +394,8 @@ impl DynamicIndexWriter {
/// Writer which splits a binary stream into dynamic sized chunks
///
/// And store the resulting chunk list into the index file.
-pub struct DynamicChunkWriter {
- index: DynamicIndexWriter,
+pub struct DynamicChunkWriter<T: CanWrite> {
+ index: DynamicIndexWriter<T>,
closed: bool,
chunker: ChunkerImpl,
stat: ChunkStat,
@@ -404,8 +404,8 @@ pub struct DynamicChunkWriter {
chunk_buffer: Vec<u8>,
}
-impl DynamicChunkWriter {
- pub fn new(index: DynamicIndexWriter, chunk_size: usize) -> Self {
+impl<T: CanWrite> DynamicChunkWriter<T> {
+ pub fn new(index: DynamicIndexWriter<T>, chunk_size: usize) -> Self {
Self {
index,
closed: false,
@@ -490,7 +490,7 @@ impl DynamicChunkWriter {
}
}
-impl Write for DynamicChunkWriter {
+impl<T: CanWrite> Write for DynamicChunkWriter<T> {
fn write(&mut self, data: &[u8]) -> std::result::Result<usize, std::io::Error> {
let chunker = &mut self.chunker;
diff --git a/pbs-datastore/src/fixed_index.rs b/pbs-datastore/src/fixed_index.rs
index d67c388e..30e86add 100644
--- a/pbs-datastore/src/fixed_index.rs
+++ b/pbs-datastore/src/fixed_index.rs
@@ -214,8 +214,8 @@ impl IndexFile for FixedIndexReader {
}
}
-pub struct FixedIndexWriter {
- store: Arc<ChunkStore>,
+pub struct FixedIndexWriter<T> {
+ store: Arc<ChunkStore<T>>,
file: File,
_lock: ProcessLockSharedGuard,
filename: PathBuf,
@@ -229,9 +229,9 @@ pub struct FixedIndexWriter {
}
// `index` is mmap()ed which cannot be thread-local so should be sendable
-unsafe impl Send for FixedIndexWriter {}
+unsafe impl<T> Send for FixedIndexWriter<T> {}
-impl Drop for FixedIndexWriter {
+impl<T> Drop for FixedIndexWriter<T> {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.tmp_filename); // ignore errors
if let Err(err) = self.unmap() {
@@ -240,10 +240,30 @@ impl Drop for FixedIndexWriter {
}
}
-impl FixedIndexWriter {
+impl<T> FixedIndexWriter<T> {
+ fn unmap(&mut self) -> Result<(), Error> {
+ if self.index.is_null() {
+ return Ok(());
+ }
+
+ let index_size = self.index_length * 32;
+
+ if let Err(err) =
+ unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
+ {
+ bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
+ }
+
+ self.index = std::ptr::null_mut();
+
+ Ok(())
+ }
+}
+
+impl<T: crate::chunk_store::CanWrite> FixedIndexWriter<T> {
#[allow(clippy::cast_ptr_alignment)]
pub fn create(
- store: Arc<ChunkStore>,
+ store: Arc<ChunkStore<T>>,
path: &Path,
size: usize,
chunk_size: usize,
@@ -320,24 +340,6 @@ impl FixedIndexWriter {
self.index_length
}
- fn unmap(&mut self) -> Result<(), Error> {
- if self.index.is_null() {
- return Ok(());
- }
-
- let index_size = self.index_length * 32;
-
- if let Err(err) =
- unsafe { nix::sys::mman::munmap(self.index as *mut std::ffi::c_void, index_size) }
- {
- bail!("unmap file {:?} failed - {}", self.tmp_filename, err);
- }
-
- self.index = std::ptr::null_mut();
-
- Ok(())
- }
-
pub fn close(&mut self) -> Result<[u8; 32], Error> {
if self.index.is_null() {
bail!("cannot close already closed index file.");
diff --git a/pbs-datastore/src/hierarchy.rs b/pbs-datastore/src/hierarchy.rs
index 8b7af038..1e313a82 100644
--- a/pbs-datastore/src/hierarchy.rs
+++ b/pbs-datastore/src/hierarchy.rs
@@ -9,16 +9,17 @@ use pbs_api_types::{BackupNamespace, BackupType, BACKUP_DATE_REGEX, BACKUP_ID_RE
use proxmox_sys::fs::get_file_type;
use crate::backup_info::{BackupDir, BackupGroup};
+use crate::chunk_store::CanRead;
use crate::DataStore;
/// A iterator for all BackupDir's (Snapshots) in a BackupGroup
-pub struct ListSnapshots {
- group: BackupGroup,
+pub struct ListSnapshots<T: CanRead> {
+ group: BackupGroup<T>,
fd: proxmox_sys::fs::ReadDir,
}
-impl ListSnapshots {
- pub fn new(group: BackupGroup) -> Result<Self, Error> {
+impl<T: CanRead> ListSnapshots<T> {
+ pub fn new(group: BackupGroup<T>) -> Result<Self, Error> {
let group_path = group.full_group_path();
Ok(ListSnapshots {
fd: proxmox_sys::fs::read_subdir(libc::AT_FDCWD, &group_path)
@@ -28,8 +29,8 @@ impl ListSnapshots {
}
}
-impl Iterator for ListSnapshots {
- type Item = Result<BackupDir, Error>;
+impl<T: CanRead> Iterator for ListSnapshots<T> {
+ type Item = Result<BackupDir<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
@@ -69,21 +70,25 @@ impl Iterator for ListSnapshots {
}
/// An iterator for a single backup group type.
-pub struct ListGroupsType {
- store: Arc<DataStore>,
+pub struct ListGroupsType<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
ty: BackupType,
dir: proxmox_sys::fs::ReadDir,
}
-impl ListGroupsType {
- pub fn new(store: Arc<DataStore>, ns: BackupNamespace, ty: BackupType) -> Result<Self, Error> {
+impl<T: CanRead> ListGroupsType<T> {
+ pub fn new(
+ store: Arc<DataStore<T>>,
+ ns: BackupNamespace,
+ ty: BackupType,
+ ) -> Result<Self, Error> {
Self::new_at(libc::AT_FDCWD, store, ns, ty)
}
fn new_at(
fd: RawFd,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
ty: BackupType,
) -> Result<Self, Error> {
@@ -95,13 +100,13 @@ impl ListGroupsType {
})
}
- pub(crate) fn ok(self) -> ListGroupsOk<Self> {
+ pub(crate) fn ok(self) -> ListGroupsOk<Self, T> {
ListGroupsOk::new(self)
}
}
-impl Iterator for ListGroupsType {
- type Item = Result<BackupGroup, Error>;
+impl<T: CanRead> Iterator for ListGroupsType<T> {
+ type Item = Result<BackupGroup<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
@@ -139,15 +144,15 @@ impl Iterator for ListGroupsType {
}
/// A iterator for a (single) level of Backup Groups
-pub struct ListGroups {
- store: Arc<DataStore>,
+pub struct ListGroups<T> {
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
type_fd: proxmox_sys::fs::ReadDir,
- id_state: Option<ListGroupsType>,
+ id_state: Option<ListGroupsType<T>>,
}
-impl ListGroups {
- pub fn new(store: Arc<DataStore>, ns: BackupNamespace) -> Result<Self, Error> {
+impl<T: CanRead> ListGroups<T> {
+ pub fn new(store: Arc<DataStore<T>>, ns: BackupNamespace) -> Result<Self, Error> {
Ok(Self {
type_fd: proxmox_sys::fs::read_subdir(libc::AT_FDCWD, &store.namespace_path(&ns))?,
store,
@@ -156,13 +161,13 @@ impl ListGroups {
})
}
- pub(crate) fn ok(self) -> ListGroupsOk<Self> {
+ pub(crate) fn ok(self) -> ListGroupsOk<Self, T> {
ListGroupsOk::new(self)
}
}
-impl Iterator for ListGroups {
- type Item = Result<BackupGroup, Error>;
+impl<T: CanRead> Iterator for ListGroups<T> {
+ type Item = Result<BackupGroup<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
@@ -222,36 +227,36 @@ pub(crate) trait GroupIter {
fn store_name(&self) -> &str;
}
-impl GroupIter for ListGroups {
+impl<T: CanRead> GroupIter for ListGroups<T> {
fn store_name(&self) -> &str {
self.store.name()
}
}
-impl GroupIter for ListGroupsType {
+impl<T: CanRead> GroupIter for ListGroupsType<T> {
fn store_name(&self) -> &str {
self.store.name()
}
}
-pub(crate) struct ListGroupsOk<I>(Option<I>)
+pub(crate) struct ListGroupsOk<I, T: CanRead>(Option<I>)
where
- I: GroupIter + Iterator<Item = Result<BackupGroup, Error>>;
+ I: GroupIter + Iterator<Item = Result<BackupGroup<T>, Error>>;
-impl<I> ListGroupsOk<I>
+impl<I, T: CanRead> ListGroupsOk<I, T>
where
- I: GroupIter + Iterator<Item = Result<BackupGroup, Error>>,
+ I: GroupIter + Iterator<Item = Result<BackupGroup<T>, Error>>,
{
fn new(inner: I) -> Self {
Self(Some(inner))
}
}
-impl<I> Iterator for ListGroupsOk<I>
+impl<I, T: CanRead> Iterator for ListGroupsOk<I, T>
where
- I: GroupIter + Iterator<Item = Result<BackupGroup, Error>>,
+ I: GroupIter + Iterator<Item = Result<BackupGroup<T>, Error>>,
{
- type Item = BackupGroup;
+ type Item = BackupGroup<T>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(iter) = &mut self.0 {
@@ -274,19 +279,21 @@ where
}
/// A iterator for a (single) level of Namespaces
-pub struct ListNamespaces {
+pub struct ListNamespaces<T> {
ns: BackupNamespace,
base_path: PathBuf,
ns_state: Option<proxmox_sys::fs::ReadDir>,
+ _marker: std::marker::PhantomData<T>,
}
-impl ListNamespaces {
+impl<T: CanRead> ListNamespaces<T> {
/// construct a new single-level namespace iterator on a datastore with an optional anchor ns
- pub fn new(store: Arc<DataStore>, ns: BackupNamespace) -> Result<Self, Error> {
+ pub fn new(store: Arc<DataStore<T>>, ns: BackupNamespace) -> Result<Self, Error> {
Ok(ListNamespaces {
ns,
base_path: store.base_path(),
ns_state: None,
+ _marker: std::marker::PhantomData,
})
}
@@ -298,11 +305,12 @@ impl ListNamespaces {
ns: ns.unwrap_or_default(),
base_path: path,
ns_state: None,
+ _marker: std::marker::PhantomData,
})
}
}
-impl Iterator for ListNamespaces {
+impl<T: CanRead> Iterator for ListNamespaces<T> {
type Item = Result<BackupNamespace, Error>;
fn next(&mut self) -> Option<Self::Item> {
@@ -366,18 +374,18 @@ impl Iterator for ListNamespaces {
/// can be useful for searching all backup groups from a certain anchor, as that can contain
/// sub-namespaces but also groups on its own level, so otherwise one would need to special case
/// the ones from the own level.
-pub struct ListNamespacesRecursive {
- store: Arc<DataStore>,
+pub struct ListNamespacesRecursive<T> {
+ store: Arc<DataStore<T>>,
/// the starting namespace we search downward from
ns: BackupNamespace,
/// the maximal recursion depth from the anchor start ns (depth == 0) downwards
max_depth: u8,
- state: Option<Vec<ListNamespaces>>, // vector to avoid code recursion
+ state: Option<Vec<ListNamespaces<T>>>, // vector to avoid code recursion
}
-impl ListNamespacesRecursive {
+impl<T: CanRead> ListNamespacesRecursive<T> {
/// Creates an recursive namespace iterator.
- pub fn new(store: Arc<DataStore>, ns: BackupNamespace) -> Result<Self, Error> {
+ pub fn new(store: Arc<DataStore<T>>, ns: BackupNamespace) -> Result<Self, Error> {
Self::new_max_depth(store, ns, pbs_api_types::MAX_NAMESPACE_DEPTH)
}
@@ -388,7 +396,7 @@ impl ListNamespacesRecursive {
/// Depth is counted relatively, that means not from the datastore as anchor, but from `ns`,
/// and it will be clamped to `min(depth, MAX_NAMESPACE_DEPTH - ns.depth())` automatically.
pub fn new_max_depth(
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
) -> Result<Self, Error> {
@@ -408,7 +416,7 @@ impl ListNamespacesRecursive {
}
}
-impl Iterator for ListNamespacesRecursive {
+impl<T: CanRead> Iterator for ListNamespacesRecursive<T> {
type Item = Result<BackupNamespace, Error>;
fn next(&mut self) -> Option<Self::Item> {
diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
index 05a70c06..ccdde9f1 100644
--- a/pbs-datastore/src/local_chunk_reader.rs
+++ b/pbs-datastore/src/local_chunk_reader.rs
@@ -7,20 +7,21 @@ use anyhow::{bail, Error};
use pbs_api_types::CryptMode;
use pbs_tools::crypt_config::CryptConfig;
+use crate::chunk_store::CanRead;
use crate::data_blob::DataBlob;
use crate::read_chunk::{AsyncReadChunk, ReadChunk};
use crate::DataStore;
#[derive(Clone)]
-pub struct LocalChunkReader {
- store: Arc<DataStore>,
+pub struct LocalChunkReader<T: CanRead> {
+ store: Arc<DataStore<T>>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
}
-impl LocalChunkReader {
+impl<T: CanRead> LocalChunkReader<T> {
pub fn new(
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
) -> Self {
@@ -47,7 +48,7 @@ impl LocalChunkReader {
}
}
-impl ReadChunk for LocalChunkReader {
+impl<T: CanRead> ReadChunk for LocalChunkReader<T> {
fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
let chunk = self.store.load_chunk(digest)?;
self.ensure_crypt_mode(chunk.crypt_mode()?)?;
@@ -63,7 +64,7 @@ impl ReadChunk for LocalChunkReader {
}
}
-impl AsyncReadChunk for LocalChunkReader {
+impl<T: CanRead + Send + Sync> AsyncReadChunk for LocalChunkReader<T> {
fn read_raw_chunk<'a>(
&'a self,
digest: &'a [u8; 32],
diff --git a/pbs-datastore/src/prune.rs b/pbs-datastore/src/prune.rs
index ad1493bf..fc0f9332 100644
--- a/pbs-datastore/src/prune.rs
+++ b/pbs-datastore/src/prune.rs
@@ -5,6 +5,8 @@ use anyhow::Error;
use pbs_api_types::KeepOptions;
+use crate::chunk_store::CanRead;
+
use super::BackupInfo;
#[derive(Clone, Copy, PartialEq, Eq)]
@@ -36,9 +38,9 @@ impl std::fmt::Display for PruneMark {
}
}
-fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>>(
+fn mark_selections<F: Fn(&BackupInfo<T>) -> Result<String, Error>, T: CanRead>(
mark: &mut HashMap<PathBuf, PruneMark>,
- list: &[BackupInfo],
+ list: &[BackupInfo<T>],
keep: usize,
select_id: F,
) -> Result<(), Error> {
@@ -82,7 +84,10 @@ fn mark_selections<F: Fn(&BackupInfo) -> Result<String, Error>>(
Ok(())
}
-fn remove_incomplete_snapshots(mark: &mut HashMap<PathBuf, PruneMark>, list: &[BackupInfo]) {
+fn remove_incomplete_snapshots<T: CanRead>(
+ mark: &mut HashMap<PathBuf, PruneMark>,
+ list: &[BackupInfo<T>],
+) {
let mut keep_unfinished = true;
for info in list.iter() {
// backup is considered unfinished if there is no manifest
@@ -104,10 +109,10 @@ fn remove_incomplete_snapshots(mark: &mut HashMap<PathBuf, PruneMark>, list: &[B
}
/// This filters incomplete and kept backups.
-pub fn compute_prune_info(
- mut list: Vec<BackupInfo>,
+pub fn compute_prune_info<T: CanRead>(
+ mut list: Vec<BackupInfo<T>>,
options: &KeepOptions,
-) -> Result<Vec<(BackupInfo, PruneMark)>, Error> {
+) -> Result<Vec<(BackupInfo<T>, PruneMark)>, Error> {
let mut mark = HashMap::new();
BackupInfo::sort_list(&mut list, false);
@@ -154,7 +159,7 @@ pub fn compute_prune_info(
})?;
}
- let prune_info: Vec<(BackupInfo, PruneMark)> = list
+ let prune_info: Vec<(BackupInfo<T>, PruneMark)> = list
.into_iter()
.map(|info| {
let backup_id = info.backup_dir.relative_path();
diff --git a/pbs-datastore/src/snapshot_reader.rs b/pbs-datastore/src/snapshot_reader.rs
index f9c77207..2ba91f0b 100644
--- a/pbs-datastore/src/snapshot_reader.rs
+++ b/pbs-datastore/src/snapshot_reader.rs
@@ -8,9 +8,10 @@ use nix::dir::Dir;
use proxmox_sys::fs::lock_dir_noblock_shared;
-use pbs_api_types::{print_store_and_ns, BackupNamespace, Operation};
+use pbs_api_types::{print_store_and_ns, BackupNamespace};
use crate::backup_info::BackupDir;
+use crate::chunk_store::CanRead;
use crate::dynamic_index::DynamicIndexReader;
use crate::fixed_index::FixedIndexReader;
use crate::index::IndexFile;
@@ -20,24 +21,24 @@ use crate::DataStore;
/// Helper to access the contents of a datastore backup snapshot
///
/// This make it easy to iterate over all used chunks and files.
-pub struct SnapshotReader {
- snapshot: BackupDir,
+pub struct SnapshotReader<T: CanRead> {
+ snapshot: BackupDir<T>,
datastore_name: String,
file_list: Vec<String>,
locked_dir: Dir,
}
-impl SnapshotReader {
+impl<T: CanRead> SnapshotReader<T> {
/// Lock snapshot, reads the manifest and returns a new instance
pub fn new(
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
ns: BackupNamespace,
snapshot: pbs_api_types::BackupDir,
) -> Result<Self, Error> {
Self::new_do(datastore.backup_dir(ns, snapshot)?)
}
- pub(crate) fn new_do(snapshot: BackupDir) -> Result<Self, Error> {
+ pub(crate) fn new_do(snapshot: BackupDir<T>) -> Result<Self, Error> {
let datastore = snapshot.datastore();
let snapshot_path = snapshot.full_path();
@@ -81,7 +82,7 @@ impl SnapshotReader {
}
/// Return the snapshot directory
- pub fn snapshot(&self) -> &BackupDir {
+ pub fn snapshot(&self) -> &BackupDir<T> {
&self.snapshot
}
@@ -111,7 +112,7 @@ impl SnapshotReader {
pub fn chunk_iterator<F: Fn(&[u8; 32]) -> bool>(
&self,
skip_fn: F,
- ) -> Result<SnapshotChunkIterator<F>, Error> {
+ ) -> Result<SnapshotChunkIterator<F, T>, Error> {
SnapshotChunkIterator::new(self, skip_fn)
}
}
@@ -121,15 +122,15 @@ impl SnapshotReader {
/// Note: The iterator returns a `Result`, and the iterator state is
/// undefined after the first error. So it make no sense to continue
/// iteration after the first error.
-pub struct SnapshotChunkIterator<'a, F: Fn(&[u8; 32]) -> bool> {
- snapshot_reader: &'a SnapshotReader,
+pub struct SnapshotChunkIterator<'a, F: Fn(&[u8; 32]) -> bool, T: CanRead> {
+ snapshot_reader: &'a SnapshotReader<T>,
todo_list: Vec<String>,
skip_fn: F,
#[allow(clippy::type_complexity)]
current_index: Option<(Arc<Box<dyn IndexFile + Send>>, usize, Vec<(usize, u64)>)>,
}
-impl<'a, F: Fn(&[u8; 32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
+impl<'a, F: Fn(&[u8; 32]) -> bool, T: CanRead> Iterator for SnapshotChunkIterator<'a, F, T> {
type Item = Result<[u8; 32], Error>;
fn next(&mut self) -> Option<Self::Item> {
@@ -149,9 +150,8 @@ impl<'a, F: Fn(&[u8; 32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
),
};
- let datastore = DataStore::lookup_datastore(
+ let datastore = DataStore::lookup_datastore_read(
self.snapshot_reader.datastore_name(),
- Some(Operation::Read),
)?;
let order =
datastore.get_chunks_in_order(&*index, &self.skip_fn, |_| Ok(()))?;
@@ -176,8 +176,8 @@ impl<'a, F: Fn(&[u8; 32]) -> bool> Iterator for SnapshotChunkIterator<'a, F> {
}
}
-impl<'a, F: Fn(&[u8; 32]) -> bool> SnapshotChunkIterator<'a, F> {
- pub fn new(snapshot_reader: &'a SnapshotReader, skip_fn: F) -> Result<Self, Error> {
+impl<'a, F: Fn(&[u8; 32]) -> bool, T: CanRead> SnapshotChunkIterator<'a, F, T> {
+ pub fn new(snapshot_reader: &'a SnapshotReader<T>, skip_fn: F) -> Result<Self, Error> {
let mut todo_list = Vec::new();
for filename in snapshot_reader.file_list() {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 07/10] api: replace datastore_lookup with new, state-typed datastore returning functions
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (5 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 06/10] pbs-datastore: " Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 08/10] server/bin: " Hannes Laimer
` (3 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/api2/admin/datastore.rs | 143 ++++++++++++++++++------------------
src/api2/admin/namespace.rs | 8 +-
src/api2/backup/mod.rs | 4 +-
src/api2/reader/mod.rs | 4 +-
src/api2/status.rs | 8 +-
src/api2/tape/restore.rs | 6 +-
6 files changed, 86 insertions(+), 87 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 976617d9..f1eed9fc 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -36,13 +36,12 @@ use pxar::EntryKind;
use pbs_api_types::{
print_ns_and_snapshot, print_store_and_ns, Authid, BackupContent, BackupNamespace, BackupType,
Counts, CryptMode, DataStoreConfig, DataStoreListItem, DataStoreStatus,
- GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, Operation,
- PruneJobOptions, SnapshotListItem, SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA,
- BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
- DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
- PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
- PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, UPID, UPID_SCHEMA,
- VERIFICATION_OUTDATED_AFTER_SCHEMA,
+ GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, PruneJobOptions,
+ SnapshotListItem, SnapshotVerifyState, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA,
+ BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, DATASTORE_SCHEMA,
+ IGNORE_VERIFIED_BACKUPS_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT,
+ PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+ PRIV_DATASTORE_VERIFY, UPID, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA,
};
use pbs_client::pxar::{create_tar, create_zip};
use pbs_config::CachedUserInfo;
@@ -88,25 +87,28 @@ fn get_group_note_path(
// 1. check privs on NS (full or limited access)
// 2. load datastore
// 3. if needed (only limited access), check owner of group
-fn check_privs_and_load_store(
- store: &str,
+fn check_privs<T: CanRead>(
+ store: Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
full_access_privs: u64,
partial_access_privs: u64,
- operation: Option<Operation>,
backup_group: &pbs_api_types::BackupGroup,
-) -> Result<Arc<DataStore>, Error> {
- let limited = check_ns_privs_full(store, ns, auth_id, full_access_privs, partial_access_privs)?;
-
- let datastore = DataStore::lookup_datastore(store, operation)?;
+) -> Result<(), Error> {
+ let limited = check_ns_privs_full(
+ store.name(),
+ ns,
+ auth_id,
+ full_access_privs,
+ partial_access_privs,
+ )?;
if limited {
- let owner = datastore.get_owner(ns, backup_group)?;
+ let owner = store.get_owner(ns, backup_group)?;
check_backup_owner(&owner, auth_id)?;
}
- Ok(datastore)
+ Ok(())
}
fn read_backup_index(
@@ -195,7 +197,7 @@ pub fn list_groups(
PRIV_DATASTORE_BACKUP,
)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
datastore
.iter_backup_groups(ns.clone())? // FIXME: Namespaces and recursion parameters!
@@ -286,14 +288,13 @@ pub async fn delete_group(
tokio::task::spawn_blocking(move || {
let ns = ns.unwrap_or_default();
-
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_PRUNE,
- Some(Operation::Write),
&group,
)?;
@@ -341,13 +342,13 @@ pub async fn list_snapshot_files(
tokio::task::spawn_blocking(move || {
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_read(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_READ,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -395,13 +396,13 @@ pub async fn delete_snapshot(
tokio::task::spawn_blocking(move || {
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_PRUNE,
- Some(Operation::Write),
&backup_dir.group,
)?;
@@ -477,7 +478,7 @@ unsafe fn list_snapshots_blocking(
PRIV_DATASTORE_BACKUP,
)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
// FIXME: filter also owner before collecting, for doing that nicely the owner should move into
// backup group and provide an error free (Err -> None) accessor
@@ -691,7 +692,7 @@ pub async fn status(
}
};
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
let (counts, gc_status) = if verbose {
let filter_owner = if store_privs & PRIV_DATASTORE_AUDIT != 0 {
@@ -804,7 +805,7 @@ pub fn verify(
PRIV_DATASTORE_BACKUP,
)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let ignore_verified = ignore_verified.unwrap_or(true);
let worker_id;
@@ -974,13 +975,13 @@ pub fn prune(
) -> Result<Value, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_PRUNE,
- Some(Operation::Write),
&group,
)?;
@@ -1149,7 +1150,7 @@ pub fn prune_datastore(
true,
)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let ns = prune_options.ns.clone().unwrap_or_default();
let worker_id = format!("{}:{}", store, ns);
@@ -1187,7 +1188,7 @@ pub fn start_garbage_collection(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let job = Job::new("garbage_collection", &store)
@@ -1238,7 +1239,7 @@ pub fn garbage_collection_status(
..Default::default()
};
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
let status_in_memory = datastore.last_gc_status();
let state_file = JobState::load("garbage_collection", &store)
.map_err(|err| log::error!("could not open GC statefile for {store}: {err}"))
@@ -1373,13 +1374,13 @@ pub fn download_file(
let backup_ns = optional_ns_param(¶m)?;
let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(¶m)?;
- let datastore = check_privs_and_load_store(
- store,
+ let datastore = DataStore::lookup_datastore_read(store)?;
+ check_privs(
+ datastore.clone(),
&backup_ns,
&auth_id,
PRIV_DATASTORE_READ,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -1458,13 +1459,13 @@ pub fn download_file_decoded(
let backup_ns = optional_ns_param(¶m)?;
let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(¶m)?;
- let datastore = check_privs_and_load_store(
- store,
+ let datastore = DataStore::lookup_datastore_read(store)?;
+ check_privs(
+ datastore.clone(),
&backup_ns,
&auth_id,
PRIV_DATASTORE_READ,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir_api.group,
)?;
@@ -1589,13 +1590,13 @@ pub fn upload_backup_log(
let backup_dir_api: pbs_api_types::BackupDir = Deserialize::deserialize(¶m)?;
- let datastore = check_privs_and_load_store(
- store,
+ let datastore = DataStore::lookup_datastore_write(store)?;
+ check_privs(
+ datastore.clone(),
&backup_ns,
&auth_id,
0,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Write),
&backup_dir_api.group,
)?;
let backup_dir = datastore.backup_dir(backup_ns.clone(), backup_dir_api.clone())?;
@@ -1686,13 +1687,13 @@ pub async fn catalog(
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_read(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_READ,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -1806,13 +1807,13 @@ pub fn pxar_file_download(
let ns = optional_ns_param(¶m)?;
let backup_dir: pbs_api_types::BackupDir = Deserialize::deserialize(¶m)?;
- let datastore = check_privs_and_load_store(
- store,
+ let datastore = DataStore::lookup_datastore_read(store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_READ,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -1944,7 +1945,7 @@ pub fn get_rrd_stats(
cf: RRDMode,
_param: Value,
) -> Result<Value, Error> {
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore(&store)?;
let disk_manager = crate::tools::disks::DiskManage::new();
let mut rrd_fields = vec![
@@ -2017,13 +2018,13 @@ pub fn get_group_notes(
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_read(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_AUDIT,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_group,
)?;
@@ -2065,13 +2066,13 @@ pub fn set_group_notes(
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Write),
&backup_group,
)?;
@@ -2111,13 +2112,13 @@ pub fn get_notes(
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_read(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_AUDIT,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -2164,13 +2165,13 @@ pub fn set_notes(
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Write),
&backup_dir.group,
)?;
@@ -2214,13 +2215,13 @@ pub fn get_protection(
) -> Result<bool, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_read(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_AUDIT,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Read),
&backup_dir.group,
)?;
@@ -2264,13 +2265,13 @@ pub async fn set_protection(
tokio::task::spawn_blocking(move || {
let ns = ns.unwrap_or_default();
- let datastore = check_privs_and_load_store(
- &store,
+ let datastore = DataStore::lookup_datastore_write(&store)?;
+ check_privs(
+ datastore.clone(),
&ns,
&auth_id,
PRIV_DATASTORE_MODIFY,
PRIV_DATASTORE_BACKUP,
- Some(Operation::Write),
&backup_dir.group,
)?;
@@ -2324,7 +2325,7 @@ pub async fn set_backup_owner(
PRIV_DATASTORE_BACKUP,
)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let backup_group = datastore.backup_group(ns, backup_group);
diff --git a/src/api2/admin/namespace.rs b/src/api2/admin/namespace.rs
index 889dc1a3..10af8693 100644
--- a/src/api2/admin/namespace.rs
+++ b/src/api2/admin/namespace.rs
@@ -6,7 +6,7 @@ use proxmox_router::{http_bail, ApiMethod, Permission, Router, RpcEnvironment};
use proxmox_schema::*;
use pbs_api_types::{
- Authid, BackupNamespace, NamespaceListItem, Operation, DATASTORE_SCHEMA, NS_MAX_DEPTH_SCHEMA,
+ Authid, BackupNamespace, NamespaceListItem, DATASTORE_SCHEMA, NS_MAX_DEPTH_SCHEMA,
PROXMOX_SAFE_ID_FORMAT,
};
@@ -55,7 +55,7 @@ pub fn create_namespace(
check_ns_modification_privs(&store, &ns, &auth_id)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
datastore.create_namespace(&parent, name)
}
@@ -98,7 +98,7 @@ pub fn list_namespaces(
// get result up-front to avoid cloning NS, it's relatively cheap anyway (no IO normally)
let parent_access = check_ns_privs(&store, &parent, &auth_id, NS_PRIVS_OK);
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
let iter = match datastore.recursive_iter_backup_ns_ok(parent, max_depth) {
Ok(iter) => iter,
@@ -156,7 +156,7 @@ pub fn delete_namespace(
check_ns_modification_privs(&store, &ns, &auth_id)?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
if !datastore.remove_namespace_recursive(&ns, delete_groups)? {
if delete_groups {
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index ea0d0292..e6a92117 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -19,7 +19,7 @@ use proxmox_sortable_macro::sortable;
use proxmox_sys::fs::lock_dir_noblock_shared;
use pbs_api_types::{
- Authid, BackupNamespace, BackupType, Operation, SnapshotVerifyState, VerifyState,
+ Authid, BackupNamespace, BackupType, SnapshotVerifyState, VerifyState,
BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA, PRIV_DATASTORE_BACKUP,
};
@@ -96,7 +96,7 @@ fn upgrade_to_backup_protocol(
)
.map_err(|err| http_err!(FORBIDDEN, "{err}"))?;
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let protocols = parts
.headers
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 23051653..48a8c5fc 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -19,7 +19,7 @@ use proxmox_sortable_macro::sortable;
use proxmox_sys::fs::lock_dir_noblock_shared;
use pbs_api_types::{
- Authid, Operation, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+ Authid, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CHUNK_DIGEST_SCHEMA, DATASTORE_SCHEMA,
PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
@@ -93,7 +93,7 @@ fn upgrade_to_backup_reader_protocol(
bail!("no permissions on /{}", acl_path.join("/"));
}
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_read(&store)?;
let backup_dir = pbs_api_types::BackupDir::deserialize(¶m)?;
diff --git a/src/api2/status.rs b/src/api2/status.rs
index f1ae0ef5..30a0cf76 100644
--- a/src/api2/status.rs
+++ b/src/api2/status.rs
@@ -8,9 +8,7 @@ use proxmox_router::{ApiMethod, Permission, Router, RpcEnvironment, SubdirMap};
use proxmox_rrd::api_types::{RRDMode, RRDTimeFrame};
use proxmox_schema::api;
-use pbs_api_types::{
- Authid, DataStoreStatusListItem, Operation, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
-};
+use pbs_api_types::{Authid, DataStoreStatusListItem, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP};
use pbs_config::CachedUserInfo;
use pbs_datastore::DataStore;
@@ -49,7 +47,7 @@ pub async fn datastore_status(
let user_privs = user_info.lookup_privs(&auth_id, &["datastore", store]);
let allowed = (user_privs & (PRIV_DATASTORE_AUDIT | PRIV_DATASTORE_BACKUP)) != 0;
if !allowed {
- if let Ok(datastore) = DataStore::lookup_datastore(store, Some(Operation::Lookup)) {
+ if let Ok(datastore) = DataStore::lookup_datastore_read(store) {
if can_access_any_namespace(datastore, &auth_id, &user_info) {
list.push(DataStoreStatusListItem::empty(store, None));
}
@@ -57,7 +55,7 @@ pub async fn datastore_status(
continue;
}
- let datastore = match DataStore::lookup_datastore(store, Some(Operation::Read)) {
+ let datastore = match DataStore::lookup_datastore_read(store) {
Ok(datastore) => datastore,
Err(err) => {
list.push(DataStoreStatusListItem::empty(store, Some(err.to_string())));
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index b28db6e3..95ce13c7 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -20,7 +20,7 @@ use proxmox_worker_task::WorkerTaskContext;
use pbs_api_types::{
parse_ns_and_snapshot, print_ns_and_snapshot, Authid, BackupDir, BackupNamespace, CryptMode,
- NotificationMode, Operation, TapeRestoreNamespace, Userid, DATASTORE_MAP_ARRAY_SCHEMA,
+ NotificationMode, TapeRestoreNamespace, Userid, DATASTORE_MAP_ARRAY_SCHEMA,
DATASTORE_MAP_LIST_SCHEMA, DRIVE_NAME_SCHEMA, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_MODIFY, PRIV_TAPE_READ, TAPE_RESTORE_NAMESPACE_SCHEMA,
TAPE_RESTORE_SNAPSHOT_SCHEMA, UPID_SCHEMA,
@@ -144,10 +144,10 @@ impl TryFrom<String> for DataStoreMap {
if let Some(index) = store.find('=') {
let mut target = store.split_off(index);
target.remove(0); // remove '='
- let datastore = DataStore::lookup_datastore(&target, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&target)?;
map.insert(store, datastore);
} else if default.is_none() {
- default = Some(DataStore::lookup_datastore(&store, Some(Operation::Write))?);
+ default = Some(DataStore::lookup_datastore_write(&store)?);
} else {
bail!("multiple default stores given");
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 08/10] server/bin: replace datastore_lookup with new, state-typed datastore returning functions
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (6 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 07/10] api: replace datastore_lookup with new, state-typed datastore returning functions Hannes Laimer
@ 2024-09-03 12:33 ` Hannes Laimer
2024-09-03 12:34 ` [pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks Hannes Laimer
` (2 subsequent siblings)
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:33 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/bin/proxmox-backup-proxy.rs | 4 ++--
src/server/prune_job.rs | 4 ++--
src/server/pull.rs | 6 +++---
src/server/verify_job.rs | 4 ++--
4 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 041f3aff..6ef4d61b 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -489,7 +489,7 @@ async fn schedule_datastore_garbage_collection() {
{
// limit datastore scope due to Op::Lookup
- let datastore = match DataStore::lookup_datastore(&store, Some(Operation::Lookup)) {
+ let datastore = match DataStore::lookup_datastore(&store) {
Ok(datastore) => datastore,
Err(err) => {
eprintln!("lookup_datastore failed - {err}");
@@ -532,7 +532,7 @@ async fn schedule_datastore_garbage_collection() {
Err(_) => continue, // could not get lock
};
- let datastore = match DataStore::lookup_datastore(&store, Some(Operation::Write)) {
+ let datastore = match DataStore::lookup_datastore_write(&store) {
Ok(datastore) => datastore,
Err(err) => {
log::warn!("skipping scheduled GC on {store}, could look it up - {err}");
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 1c86647a..546c0bbd 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -4,7 +4,7 @@ use anyhow::Error;
use tracing::{info, warn};
use pbs_api_types::{
- print_store_and_ns, Authid, KeepOptions, Operation, PruneJobOptions, MAX_NAMESPACE_DEPTH,
+ print_store_and_ns, Authid, KeepOptions, PruneJobOptions, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
};
use pbs_datastore::prune::compute_prune_info;
@@ -127,7 +127,7 @@ pub fn do_prune_job(
auth_id: &Authid,
schedule: Option<String>,
) -> Result<String, Error> {
- let datastore = DataStore::lookup_datastore(&store, Some(Operation::Write))?;
+ let datastore = DataStore::lookup_datastore_write(&store)?;
let worker_type = job.jobtype().to_string();
let auth_id = auth_id.clone();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index de1bb5d5..41ab5e0e 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -16,7 +16,7 @@ use tracing::{info, warn};
use pbs_api_types::{
print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
- GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+ GroupListItem, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
@@ -548,12 +548,12 @@ impl PullParameters {
})
} else {
Arc::new(LocalSource {
- store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+ store: DataStore::lookup_datastore_read(remote_store)?,
ns: remote_ns,
})
};
let target = PullTarget {
- store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
+ store: DataStore::lookup_datastore_write(store)?,
ns,
};
diff --git a/src/server/verify_job.rs b/src/server/verify_job.rs
index a15a257d..bd5253ed 100644
--- a/src/server/verify_job.rs
+++ b/src/server/verify_job.rs
@@ -1,7 +1,7 @@
use anyhow::{format_err, Error};
use tracing::{error, info};
-use pbs_api_types::{Authid, Operation, VerificationJobConfig};
+use pbs_api_types::{Authid, VerificationJobConfig};
use pbs_datastore::DataStore;
use proxmox_rest_server::WorkerTask;
@@ -18,7 +18,7 @@ pub fn do_verification_job(
schedule: Option<String>,
to_stdout: bool,
) -> Result<String, Error> {
- let datastore = DataStore::lookup_datastore(&verification_job.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&verification_job.store)?;
let outdated_after = verification_job.outdated_after;
let ignore_verified_snapshots = verification_job.ignore_verified.unwrap_or(true);
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (7 preceding siblings ...)
2024-09-03 12:33 ` [pbs-devel] [PATCH proxmox-backup RFC 08/10] server/bin: " Hannes Laimer
@ 2024-09-03 12:34 ` Hannes Laimer
2024-09-03 12:34 ` [pbs-devel] [PATCH proxmox-backup RFC 10/10] backup/server/tape: " Hannes Laimer
2024-09-04 7:34 ` [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Wolfgang Bumiller
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:34 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/api2/admin/datastore.rs | 27 ++---
src/api2/backup/environment.rs | 176 +++++++++++++++++---------------
src/api2/backup/mod.rs | 21 ++--
src/api2/backup/upload_chunk.rs | 19 ++--
src/api2/reader/environment.rs | 31 +++---
src/api2/reader/mod.rs | 5 +-
src/api2/tape/backup.rs | 21 ++--
src/api2/tape/drive.rs | 2 +-
src/api2/tape/restore.rs | 69 +++++++------
9 files changed, 196 insertions(+), 175 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index f1eed9fc..e1124284 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -10,6 +10,7 @@ use anyhow::{bail, format_err, Error};
use futures::*;
use hyper::http::request::Parts;
use hyper::{header, Body, Response, StatusCode};
+use pbs_datastore::chunk_store::{CanRead, Read};
use serde::Deserialize;
use serde_json::{json, Value};
use tokio_stream::wrappers::ReceiverStream;
@@ -73,8 +74,8 @@ use crate::server::jobstate::{compute_schedule_status, Job, JobState};
const GROUP_NOTES_FILE_NAME: &str = "notes";
-fn get_group_note_path(
- store: &DataStore,
+fn get_group_note_path<T>(
+ store: &DataStore<T>,
ns: &BackupNamespace,
group: &pbs_api_types::BackupGroup,
) -> PathBuf {
@@ -111,8 +112,8 @@ fn check_privs<T: CanRead>(
Ok(())
}
-fn read_backup_index(
- backup_dir: &BackupDir,
+fn read_backup_index<T: CanRead>(
+ backup_dir: &BackupDir<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, index_size) = backup_dir.load_manifest()?;
@@ -137,8 +138,8 @@ fn read_backup_index(
Ok((manifest, result))
}
-fn get_all_snapshot_files(
- info: &BackupInfo,
+fn get_all_snapshot_files<T: CanRead>(
+ info: &BackupInfo<T>,
) -> Result<(BackupManifest, Vec<BackupContent>), Error> {
let (manifest, mut files) = read_backup_index(&info.backup_dir)?;
@@ -502,7 +503,7 @@ unsafe fn list_snapshots_blocking(
(None, None) => datastore.list_backup_groups(ns.clone())?,
};
- let info_to_snapshot_list_item = |group: &BackupGroup, owner, info: BackupInfo| {
+ let info_to_snapshot_list_item = |group: &BackupGroup<Read>, owner, info: BackupInfo<Read>| {
let backup = pbs_api_types::BackupDir {
group: group.into(),
time: info.backup_dir.backup_time(),
@@ -604,8 +605,8 @@ unsafe fn list_snapshots_blocking(
})
}
-async fn get_snapshots_count(
- store: &Arc<DataStore>,
+async fn get_snapshots_count<T: CanRead + Send + Sync + 'static>(
+ store: &Arc<DataStore<T>>,
owner: Option<&Authid>,
) -> Result<Counts, Error> {
let store = Arc::clone(store);
@@ -1771,12 +1772,12 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
-fn get_local_pxar_reader(
- datastore: Arc<DataStore>,
+fn get_local_pxar_reader<T: CanRead>(
+ datastore: Arc<DataStore<T>>,
manifest: &BackupManifest,
- backup_dir: &BackupDir,
+ backup_dir: &BackupDir<T>,
pxar_name: &str,
-) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+) -> Result<(LocalDynamicReadAt<LocalChunkReader<T>>, u64), Error> {
let mut path = datastore.base_path();
path.push(backup_dir.relative_path());
path.push(pxar_name);
diff --git a/src/api2/backup/environment.rs b/src/api2/backup/environment.rs
index 99d885e2..f222da76 100644
--- a/src/api2/backup/environment.rs
+++ b/src/api2/backup/environment.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, format_err, Error};
use nix::dir::Dir;
+use pbs_datastore::chunk_store::CanWrite;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::info;
@@ -53,17 +54,17 @@ impl std::ops::Add for UploadStatistic {
}
}
-struct DynamicWriterState {
+struct DynamicWriterState<T> {
name: String,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
offset: u64,
chunk_count: u64,
upload_stat: UploadStatistic,
}
-struct FixedWriterState {
+struct FixedWriterState<T> {
name: String,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
size: usize,
chunk_size: u32,
chunk_count: u64,
@@ -75,18 +76,18 @@ struct FixedWriterState {
// key=digest, value=length
type KnownChunksMap = HashMap<[u8; 32], u32>;
-struct SharedBackupState {
+struct SharedBackupState<T> {
finished: bool,
uid_counter: usize,
file_counter: usize, // successfully uploaded files
- dynamic_writers: HashMap<usize, DynamicWriterState>,
- fixed_writers: HashMap<usize, FixedWriterState>,
+ dynamic_writers: HashMap<usize, DynamicWriterState<T>>,
+ fixed_writers: HashMap<usize, FixedWriterState<T>>,
known_chunks: KnownChunksMap,
backup_size: u64, // sums up size of all files
backup_stat: UploadStatistic,
}
-impl SharedBackupState {
+impl<T> SharedBackupState<T> {
// Raise error if finished flag is set
fn ensure_unfinished(&self) -> Result<(), Error> {
if self.finished {
@@ -104,26 +105,26 @@ impl SharedBackupState {
/// `RpcEnvironment` implementation for backup service
#[derive(Clone)]
-pub struct BackupEnvironment {
+pub struct BackupEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
- pub last_backup: Option<BackupInfo>,
- state: Arc<Mutex<SharedBackupState>>,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
+ pub last_backup: Option<BackupInfo<T>>,
+ state: Arc<Mutex<SharedBackupState<T>>>,
}
-impl BackupEnvironment {
+impl<T> BackupEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
let state = SharedBackupState {
finished: false,
@@ -149,7 +150,69 @@ impl BackupEnvironment {
state: Arc::new(Mutex::new(state)),
}
}
+}
+
+impl<T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static> BackupEnvironment<T> {
+ pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
+ self.formatter.format_result(result, self)
+ }
+
+ /// If verify-new is set on the datastore, this will run a new verify task
+ /// for the backup. If not, this will return and also drop the passed lock
+ /// immediately.
+ pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
+ self.ensure_finished()?;
+
+ if !self.datastore.verify_new() {
+ // no verify requested, do nothing
+ return Ok(());
+ }
+
+ // Downgrade to shared lock, the backup itself is finished
+ drop(excl_snap_lock);
+ let snap_lock = lock_dir_noblock_shared(
+ &self.backup_dir.full_path(),
+ "snapshot",
+ "snapshot is already locked by another operation",
+ )?;
+
+ let worker_id = format!(
+ "{}:{}/{}/{:08X}",
+ self.datastore.name(),
+ self.backup_dir.backup_type(),
+ self.backup_dir.backup_id(),
+ self.backup_dir.backup_time()
+ );
+
+ let datastore = self.datastore.clone();
+ let backup_dir = self.backup_dir.clone();
+
+ WorkerTask::new_thread(
+ "verify",
+ Some(worker_id),
+ self.auth_id.to_string(),
+ false,
+ move |worker| {
+ worker.log_message("Automatically verifying newly added snapshot");
+
+ let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
+ if !verify_backup_dir_with_lock(
+ &verify_worker,
+ &backup_dir,
+ worker.upid().clone(),
+ None,
+ snap_lock,
+ )? {
+ bail!("verification failed - please check the log for details");
+ }
+ Ok(())
+ },
+ )
+ .map(|_| ())
+ }
+}
+impl<T: CanWrite> BackupEnvironment<T> {
/// Register a Chunk with associated length.
///
/// We do not fully trust clients, so a client may only use registered
@@ -262,7 +325,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_dynamic_writer(
&self,
- index: DynamicIndexWriter,
+ index: DynamicIndexWriter<T>,
name: String,
) -> Result<usize, Error> {
let mut state = self.state.lock().unwrap();
@@ -288,7 +351,7 @@ impl BackupEnvironment {
/// Store the writer with an unique ID
pub fn register_fixed_writer(
&self,
- index: FixedIndexWriter,
+ index: FixedIndexWriter<T>,
name: String,
size: usize,
chunk_size: u32,
@@ -632,61 +695,6 @@ impl BackupEnvironment {
Ok(())
}
- /// If verify-new is set on the datastore, this will run a new verify task
- /// for the backup. If not, this will return and also drop the passed lock
- /// immediately.
- pub fn verify_after_complete(&self, excl_snap_lock: Dir) -> Result<(), Error> {
- self.ensure_finished()?;
-
- if !self.datastore.verify_new() {
- // no verify requested, do nothing
- return Ok(());
- }
-
- // Downgrade to shared lock, the backup itself is finished
- drop(excl_snap_lock);
- let snap_lock = lock_dir_noblock_shared(
- &self.backup_dir.full_path(),
- "snapshot",
- "snapshot is already locked by another operation",
- )?;
-
- let worker_id = format!(
- "{}:{}/{}/{:08X}",
- self.datastore.name(),
- self.backup_dir.backup_type(),
- self.backup_dir.backup_id(),
- self.backup_dir.backup_time()
- );
-
- let datastore = self.datastore.clone();
- let backup_dir = self.backup_dir.clone();
-
- WorkerTask::new_thread(
- "verify",
- Some(worker_id),
- self.auth_id.to_string(),
- false,
- move |worker| {
- worker.log_message("Automatically verifying newly added snapshot");
-
- let verify_worker = crate::backup::VerifyWorker::new(worker.clone(), datastore);
- if !verify_backup_dir_with_lock(
- &verify_worker,
- &backup_dir,
- worker.upid().clone(),
- None,
- snap_lock,
- )? {
- bail!("verification failed - please check the log for details");
- }
-
- Ok(())
- },
- )
- .map(|_| ())
- }
-
pub fn log<S: AsRef<str>>(&self, msg: S) {
info!("{}", msg.as_ref());
}
@@ -701,10 +709,6 @@ impl BackupEnvironment {
}
}
- pub fn format_response(&self, result: Result<Value, Error>) -> Response<Body> {
- self.formatter.format_result(result, self)
- }
-
/// Raise error if finished flag is not set
pub fn ensure_finished(&self) -> Result<(), Error> {
let state = self.state.lock().unwrap();
@@ -735,7 +739,7 @@ impl BackupEnvironment {
}
}
-impl RpcEnvironment for BackupEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for BackupEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -757,14 +761,18 @@ impl RpcEnvironment for BackupEnvironment {
}
}
-impl AsRef<BackupEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<BackupEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &BackupEnvironment {
- self.as_any().downcast_ref::<BackupEnvironment>().unwrap()
+impl<T: 'static> AsRef<BackupEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &BackupEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<BackupEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/backup/mod.rs b/src/api2/backup/mod.rs
index e6a92117..fda7f41d 100644
--- a/src/api2/backup/mod.rs
+++ b/src/api2/backup/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::{Lookup, Write};
use serde::Deserialize;
use serde_json::{json, Value};
@@ -281,7 +282,7 @@ fn upgrade_to_backup_protocol(
return Ok(());
}
- let verify = |env: BackupEnvironment| {
+ let verify = |env: BackupEnvironment<pbs_datastore::chunk_store::Write>| {
if let Err(err) = env.verify_after_complete(snap_guard) {
env.log(format!(
"backup finished, but starting the requested verify task failed: {}",
@@ -402,7 +403,7 @@ fn create_dynamic_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
@@ -452,7 +453,7 @@ fn create_fixed_index(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let name = required_string_param(¶m, "archive-name")?.to_owned();
let size = required_integer_param(¶m, "size")? as usize;
@@ -567,7 +568,7 @@ fn dynamic_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("dynamic_append {} chunks", digest_list.len()));
@@ -641,7 +642,7 @@ fn fixed_append(
);
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.debug(format!("fixed_append {} chunks", digest_list.len()));
@@ -716,7 +717,7 @@ fn close_dynamic_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.dynamic_writer_close(wid, chunk_count, size, csum)?;
@@ -769,7 +770,7 @@ fn close_fixed_index(
let csum_str = required_string_param(¶m, "csum")?;
let csum = <[u8; 32]>::from_hex(csum_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
env.fixed_writer_close(wid, chunk_count, size, csum)?;
@@ -783,7 +784,7 @@ fn finish_backup(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Write> = rpcenv.as_ref();
env.finish_backup()?;
env.log("successfully finished backup");
@@ -802,7 +803,7 @@ fn get_previous_backup_time(
_info: &ApiMethod,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<Value, Error> {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<Lookup> = rpcenv.as_ref();
let backup_time = env
.last_backup
@@ -829,7 +830,7 @@ fn download_previous(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<pbs_datastore::chunk_store::Write> = rpcenv.as_ref();
let archive_name = required_string_param(¶m, "archive-name")?.to_owned();
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 20259660..57b30dec 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -7,6 +7,7 @@ use futures::*;
use hex::FromHex;
use hyper::http::request::Parts;
use hyper::Body;
+use pbs_datastore::chunk_store::{CanWrite, Write as StoreWrite};
use serde_json::{json, Value};
use proxmox_router::{ApiHandler, ApiMethod, ApiResponseFuture, RpcEnvironment};
@@ -20,19 +21,19 @@ use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
-pub struct UploadChunk {
+pub struct UploadChunk<T> {
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
raw_data: Option<Vec<u8>>,
}
-impl UploadChunk {
+impl<T: CanWrite> UploadChunk<T> {
pub fn new(
stream: Body,
- store: Arc<DataStore>,
+ store: Arc<DataStore<T>>,
digest: [u8; 32],
size: u32,
encoded_size: u32,
@@ -48,7 +49,7 @@ impl UploadChunk {
}
}
-impl Future for UploadChunk {
+impl<T: CanWrite> Future for UploadChunk<T> {
type Output = Result<([u8; 32], u32, u32, bool), Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -159,7 +160,7 @@ fn upload_fixed_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -228,7 +229,7 @@ fn upload_dynamic_chunk(
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
let (digest, size, compressed_size, is_duplicate) =
UploadChunk::new(req_body, env.datastore.clone(), digest, size, encoded_size).await?;
@@ -273,7 +274,7 @@ fn upload_speedtest(
println!("Upload error: {}", err);
}
}
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
Ok(env.format_response(Ok(Value::Null)))
}
.boxed()
@@ -312,7 +313,7 @@ fn upload_blob(
let file_name = required_string_param(¶m, "file-name")?.to_owned();
let encoded_size = required_integer_param(¶m, "encoded-size")? as usize;
- let env: &BackupEnvironment = rpcenv.as_ref();
+ let env: &BackupEnvironment<StoreWrite> = rpcenv.as_ref();
if !file_name.ends_with(".blob") {
bail!("wrong blob file extension: '{}'", file_name);
diff --git a/src/api2/reader/environment.rs b/src/api2/reader/environment.rs
index 3b2f06f4..13d346eb 100644
--- a/src/api2/reader/environment.rs
+++ b/src/api2/reader/environment.rs
@@ -1,6 +1,7 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
+use pbs_datastore::chunk_store::CanRead;
use serde_json::{json, Value};
use proxmox_router::{RpcEnvironment, RpcEnvironmentType};
@@ -14,25 +15,25 @@ use tracing::info;
/// `RpcEnvironment` implementation for backup reader service
#[derive(Clone)]
-pub struct ReaderEnvironment {
+pub struct ReaderEnvironment<T> {
env_type: RpcEnvironmentType,
result_attributes: Value,
auth_id: Authid,
pub debug: bool,
pub formatter: &'static dyn OutputFormatter,
pub worker: Arc<WorkerTask>,
- pub datastore: Arc<DataStore>,
- pub backup_dir: BackupDir,
+ pub datastore: Arc<DataStore<T>>,
+ pub backup_dir: BackupDir<T>,
allowed_chunks: Arc<RwLock<HashSet<[u8; 32]>>>,
}
-impl ReaderEnvironment {
+impl<T: CanRead> ReaderEnvironment<T> {
pub fn new(
env_type: RpcEnvironmentType,
auth_id: Authid,
worker: Arc<WorkerTask>,
- datastore: Arc<DataStore>,
- backup_dir: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ backup_dir: BackupDir<T>,
) -> Self {
Self {
result_attributes: json!({}),
@@ -71,7 +72,7 @@ impl ReaderEnvironment {
}
}
-impl RpcEnvironment for ReaderEnvironment {
+impl<T: Send + Sync + 'static> RpcEnvironment for ReaderEnvironment<T> {
fn result_attrib_mut(&mut self) -> &mut Value {
&mut self.result_attributes
}
@@ -93,14 +94,18 @@ impl RpcEnvironment for ReaderEnvironment {
}
}
-impl AsRef<ReaderEnvironment> for dyn RpcEnvironment {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for dyn RpcEnvironment {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
-impl AsRef<ReaderEnvironment> for Box<dyn RpcEnvironment> {
- fn as_ref(&self) -> &ReaderEnvironment {
- self.as_any().downcast_ref::<ReaderEnvironment>().unwrap()
+impl<T: 'static> AsRef<ReaderEnvironment<T>> for Box<dyn RpcEnvironment> {
+ fn as_ref(&self) -> &ReaderEnvironment<T> {
+ self.as_any()
+ .downcast_ref::<ReaderEnvironment<T>>()
+ .unwrap()
}
}
diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs
index 48a8c5fc..cf20addc 100644
--- a/src/api2/reader/mod.rs
+++ b/src/api2/reader/mod.rs
@@ -6,6 +6,7 @@ use hex::FromHex;
use hyper::header::{self, HeaderValue, CONNECTION, UPGRADE};
use hyper::http::request::Parts;
use hyper::{Body, Request, Response, StatusCode};
+use pbs_datastore::chunk_store::Read;
use serde::Deserialize;
use serde_json::Value;
@@ -253,7 +254,7 @@ fn download_file(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let file_name = required_string_param(¶m, "file-name")?.to_owned();
@@ -309,7 +310,7 @@ fn download_chunk(
rpcenv: Box<dyn RpcEnvironment>,
) -> ApiResponseFuture {
async move {
- let env: &ReaderEnvironment = rpcenv.as_ref();
+ let env: &ReaderEnvironment<Read> = rpcenv.as_ref();
let digest_str = required_string_param(¶m, "digest")?;
let digest = <[u8; 32]>::from_hex(digest_str)?;
diff --git a/src/api2/tape/backup.rs b/src/api2/tape/backup.rs
index cf5a0189..662b953e 100644
--- a/src/api2/tape/backup.rs
+++ b/src/api2/tape/backup.rs
@@ -1,6 +1,7 @@
use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::CanWrite;
use serde_json::Value;
use tracing::{info, warn};
@@ -11,9 +12,9 @@ use proxmox_schema::api;
use proxmox_worker_task::WorkerTaskContext;
use pbs_api_types::{
- print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, Operation,
- TapeBackupJobConfig, TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA,
- PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT, PRIV_TAPE_WRITE, UPID_SCHEMA,
+ print_ns_and_snapshot, print_store_and_ns, Authid, MediaPoolConfig, TapeBackupJobConfig,
+ TapeBackupJobSetup, TapeBackupJobStatus, JOB_ID_SCHEMA, PRIV_DATASTORE_READ, PRIV_TAPE_AUDIT,
+ PRIV_TAPE_WRITE, UPID_SCHEMA,
};
use pbs_config::CachedUserInfo;
@@ -150,7 +151,7 @@ pub fn do_tape_backup_job(
let worker_type = job.jobtype().to_string();
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -306,7 +307,7 @@ pub fn backup(
check_backup_permission(&auth_id, &setup.store, &setup.pool, &setup.drive)?;
- let datastore = DataStore::lookup_datastore(&setup.store, Some(Operation::Read))?;
+ let datastore = DataStore::lookup_datastore_write(&setup.store)?;
let (config, _digest) = pbs_config::media_pool::config()?;
let pool_config: MediaPoolConfig = config.lookup("pool", &setup.pool)?;
@@ -360,9 +361,9 @@ enum SnapshotBackupResult {
Ignored,
}
-fn backup_worker(
+fn backup_worker<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
pool_config: &MediaPoolConfig,
setup: &TapeBackupJobSetup,
summary: &mut TapeBackupJobSummary,
@@ -560,11 +561,11 @@ fn update_media_online_status(drive: &str) -> Result<Option<String>, Error> {
}
}
-fn backup_snapshot(
+fn backup_snapshot<T: CanWrite + Send + Sync + 'static>(
worker: &WorkerTask,
pool_writer: &mut PoolWriter,
- datastore: Arc<DataStore>,
- snapshot: BackupDir,
+ datastore: Arc<DataStore<T>>,
+ snapshot: BackupDir<T>,
) -> Result<SnapshotBackupResult, Error> {
let snapshot_path = snapshot.relative_path();
info!("backup snapshot {snapshot_path:?}");
diff --git a/src/api2/tape/drive.rs b/src/api2/tape/drive.rs
index ba9051de..342406c6 100644
--- a/src/api2/tape/drive.rs
+++ b/src/api2/tape/drive.rs
@@ -1342,7 +1342,7 @@ pub fn catalog_media(
drive.read_label()?; // skip over labels - we already read them above
let mut checked_chunks = HashMap::new();
- restore_media(
+ restore_media::<pbs_datastore::chunk_store::Write>(
worker,
&mut drive,
&media_id,
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 95ce13c7..20558670 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, format_err, Error};
+use pbs_datastore::chunk_store::{CanRead, CanWrite, Write as StoreWrite};
use serde_json::Value;
use tracing::{info, warn};
@@ -120,13 +121,13 @@ impl NamespaceMap {
}
}
-pub struct DataStoreMap {
- map: HashMap<String, Arc<DataStore>>,
- default: Option<Arc<DataStore>>,
+pub struct DataStoreMap<T> {
+ map: HashMap<String, Arc<DataStore<T>>>,
+ default: Option<Arc<DataStore<T>>>,
ns_map: Option<NamespaceMap>,
}
-impl TryFrom<String> for DataStoreMap {
+impl TryFrom<String> for DataStoreMap<StoreWrite> {
type Error = Error;
fn try_from(value: String) -> Result<Self, Error> {
@@ -161,7 +162,7 @@ impl TryFrom<String> for DataStoreMap {
}
}
-impl DataStoreMap {
+impl<T> DataStoreMap<T> {
fn add_namespaces_maps(&mut self, mappings: Vec<String>) -> Result<bool, Error> {
let count = mappings.len();
let ns_map = NamespaceMap::try_from(mappings)?;
@@ -169,7 +170,9 @@ impl DataStoreMap {
Ok(count > 0)
}
- fn used_datastores(&self) -> HashMap<&str, (Arc<DataStore>, Option<HashSet<BackupNamespace>>)> {
+ fn used_datastores(
+ &self,
+ ) -> HashMap<&str, (Arc<DataStore<T>>, Option<HashSet<BackupNamespace>>)> {
let mut map = HashMap::new();
for (source, target) in self.map.iter() {
let ns = self.ns_map.as_ref().map(|map| map.used_namespaces(source));
@@ -189,7 +192,7 @@ impl DataStoreMap {
.map(|mapping| mapping.get_namespaces(datastore, ns))
}
- fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore>> {
+ fn target_store(&self, source_datastore: &str) -> Option<Arc<DataStore<T>>> {
self.map
.get(source_datastore)
.or(self.default.as_ref())
@@ -200,7 +203,7 @@ impl DataStoreMap {
&self,
source_datastore: &str,
source_ns: &BackupNamespace,
- ) -> Option<(Arc<DataStore>, Option<Vec<BackupNamespace>>)> {
+ ) -> Option<(Arc<DataStore<T>>, Option<Vec<BackupNamespace>>)> {
self.target_store(source_datastore)
.map(|store| (store, self.target_ns(source_datastore, source_ns)))
}
@@ -237,9 +240,9 @@ fn check_datastore_privs(
Ok(())
}
-fn check_and_create_namespaces(
+fn check_and_create_namespaces<T: CanWrite>(
user_info: &CachedUserInfo,
- store: &Arc<DataStore>,
+ store: &Arc<DataStore<T>>,
ns: &BackupNamespace,
auth_id: &Authid,
owner: Option<&Authid>,
@@ -449,13 +452,13 @@ pub fn restore(
}
#[allow(clippy::too_many_arguments)]
-fn restore_full_worker(
+fn restore_full_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
auth_id: &Authid,
@@ -529,8 +532,8 @@ fn restore_full_worker(
}
#[allow(clippy::too_many_arguments)]
-fn check_snapshot_restorable(
- store_map: &DataStoreMap,
+fn check_snapshot_restorable<T: CanRead>(
+ store_map: &DataStoreMap<T>,
store: &str,
snapshot: &str,
ns: &BackupNamespace,
@@ -618,14 +621,14 @@ fn log_required_tapes<'a>(inventory: &Inventory, list: impl Iterator<Item = &'a
}
#[allow(clippy::too_many_arguments)]
-fn restore_list_worker(
+fn restore_list_worker<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
snapshots: Vec<String>,
inventory: Inventory,
media_set_uuid: Uuid,
drive_config: SectionConfigData,
drive_name: &str,
- store_map: DataStoreMap,
+ store_map: DataStoreMap<T>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
user_info: Arc<CachedUserInfo>,
@@ -955,16 +958,16 @@ fn get_media_set_catalog(
Ok(catalog)
}
-fn media_set_tmpdir(datastore: &DataStore, media_set_uuid: &Uuid) -> PathBuf {
+fn media_set_tmpdir<T>(datastore: &DataStore<T>, media_set_uuid: &Uuid) -> PathBuf {
let mut path = datastore.base_path();
path.push(".tmp");
path.push(media_set_uuid.to_string());
path
}
-fn snapshot_tmpdir(
+fn snapshot_tmpdir<T>(
source_datastore: &str,
- datastore: &DataStore,
+ datastore: &DataStore<T>,
snapshot: &str,
media_set_uuid: &Uuid,
) -> PathBuf {
@@ -974,9 +977,9 @@ fn snapshot_tmpdir(
path
}
-fn restore_snapshots_to_tmpdir(
+fn restore_snapshots_to_tmpdir<T>(
worker: Arc<WorkerTask>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_list: &[u64],
mut drive: Box<dyn TapeDriver>,
media_id: &MediaId,
@@ -1083,10 +1086,10 @@ fn restore_snapshots_to_tmpdir(
Ok(tmp_paths)
}
-fn restore_file_chunk_map(
+fn restore_file_chunk_map<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
file_chunk_map: &mut BTreeMap<u64, HashSet<[u8; 32]>>,
) -> Result<(), Error> {
for (nr, chunk_map) in file_chunk_map.iter_mut() {
@@ -1133,10 +1136,10 @@ fn restore_file_chunk_map(
Ok(())
}
-fn restore_partial_chunk_archive<'a>(
+fn restore_partial_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
chunk_list: &mut HashSet<[u8; 32]>,
) -> Result<usize, Error> {
let mut decoder = ChunkArchiveDecoder::new(reader);
@@ -1195,12 +1198,12 @@ fn restore_partial_chunk_archive<'a>(
/// Request and restore complete media without using existing catalog (create catalog instead)
#[allow(clippy::too_many_arguments)]
-pub fn request_and_restore_media(
+pub fn request_and_restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
media_id: &MediaId,
drive_config: &SectionConfigData,
drive_name: &str,
- store_map: &DataStoreMap,
+ store_map: &DataStoreMap<T>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
restore_owner: &Authid,
notification_mode: &TapeNotificationMode,
@@ -1253,11 +1256,11 @@ pub fn request_and_restore_media(
/// Restore complete media content and catalog
///
/// Only create the catalog if target is None.
-pub fn restore_media(
+pub fn restore_media<T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
drive: &mut Box<dyn TapeDriver>,
media_id: &MediaId,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
auth_id: &Authid,
@@ -1301,11 +1304,11 @@ pub fn restore_media(
}
#[allow(clippy::too_many_arguments)]
-fn restore_archive<'a>(
+fn restore_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
mut reader: Box<dyn 'a + TapeRead>,
current_file_number: u64,
- target: Option<(&DataStoreMap, &Authid)>,
+ target: Option<(&DataStoreMap<T>, &Authid)>,
catalog: &mut MediaCatalog,
checked_chunks_map: &mut HashMap<String, HashSet<[u8; 32]>>,
verbose: bool,
@@ -1525,10 +1528,10 @@ fn scan_chunk_archive<'a>(
Ok(Some(chunks))
}
-fn restore_chunk_archive<'a>(
+fn restore_chunk_archive<'a, T: CanWrite + Send + Sync + 'static>(
worker: Arc<WorkerTask>,
reader: Box<dyn 'a + TapeRead>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
checked_chunks: &mut HashSet<[u8; 32]>,
verbose: bool,
) -> Result<Option<Vec<[u8; 32]>>, Error> {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* [pbs-devel] [PATCH proxmox-backup RFC 10/10] backup/server/tape: add generics and separate functions into impl blocks
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (8 preceding siblings ...)
2024-09-03 12:34 ` [pbs-devel] [PATCH proxmox-backup RFC 09/10] api: add generics and separate functions into impl blocks Hannes Laimer
@ 2024-09-03 12:34 ` Hannes Laimer
2024-09-04 7:34 ` [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Wolfgang Bumiller
10 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2024-09-03 12:34 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
src/backup/hierarchy.rs | 26 +++++-----
src/backup/verify.rs | 53 +++++++++++----------
src/server/gc_job.rs | 8 ++--
src/server/prune_job.rs | 5 +-
src/server/pull.rs | 23 ++++-----
src/tape/file_formats/snapshot_archive.rs | 5 +-
src/tape/pool_writer/mod.rs | 11 +++--
src/tape/pool_writer/new_chunks_iterator.rs | 7 +--
8 files changed, 74 insertions(+), 64 deletions(-)
diff --git a/src/backup/hierarchy.rs b/src/backup/hierarchy.rs
index 640a7762..29f05c9e 100644
--- a/src/backup/hierarchy.rs
+++ b/src/backup/hierarchy.rs
@@ -7,7 +7,9 @@ use pbs_api_types::{
PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_READ,
};
use pbs_config::CachedUserInfo;
-use pbs_datastore::{backup_info::BackupGroup, DataStore, ListGroups, ListNamespacesRecursive};
+use pbs_datastore::{
+ backup_info::BackupGroup, chunk_store::CanRead, DataStore, ListGroups, ListNamespacesRecursive,
+};
/// Asserts that `privs` are fulfilled on datastore + (optional) namespace.
pub fn check_ns_privs(
@@ -68,8 +70,8 @@ pub fn check_ns_privs_full(
);
}
-pub fn can_access_any_namespace(
- store: Arc<DataStore>,
+pub fn can_access_any_namespace<T: CanRead>(
+ store: Arc<DataStore<T>>,
auth_id: &Authid,
user_info: &CachedUserInfo,
) -> bool {
@@ -95,8 +97,8 @@ pub fn can_access_any_namespace(
///
/// Is basically just a filter-iter for pbs_datastore::ListNamespacesRecursive including access and
/// optional owner checks.
-pub struct ListAccessibleBackupGroups<'a> {
- store: &'a Arc<DataStore>,
+pub struct ListAccessibleBackupGroups<'a, T> {
+ store: &'a Arc<DataStore<T>>,
auth_id: Option<&'a Authid>,
user_info: Arc<CachedUserInfo>,
/// The priv on NS level that allows auth_id trump the owner check
@@ -104,15 +106,15 @@ pub struct ListAccessibleBackupGroups<'a> {
/// The priv that auth_id is required to have on NS level additionally to being owner
owner_and_priv: u64,
/// Contains the intertnal state, group iter and a bool flag for override_owner_priv
- state: Option<(ListGroups, bool)>,
- ns_iter: ListNamespacesRecursive,
+ state: Option<(ListGroups<T>, bool)>,
+ ns_iter: ListNamespacesRecursive<T>,
}
-impl<'a> ListAccessibleBackupGroups<'a> {
+impl<'a, T: CanRead> ListAccessibleBackupGroups<'a, T> {
// TODO: builder pattern
pub fn new_owned(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
auth_id: Option<&'a Authid>,
@@ -122,7 +124,7 @@ impl<'a> ListAccessibleBackupGroups<'a> {
}
pub fn new_with_privs(
- store: &'a Arc<DataStore>,
+ store: &'a Arc<DataStore<T>>,
ns: BackupNamespace,
max_depth: usize,
override_owner_priv: Option<u64>,
@@ -145,8 +147,8 @@ impl<'a> ListAccessibleBackupGroups<'a> {
pub static NS_PRIVS_OK: u64 =
PRIV_DATASTORE_MODIFY | PRIV_DATASTORE_READ | PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT;
-impl<'a> Iterator for ListAccessibleBackupGroups<'a> {
- type Item = Result<BackupGroup, Error>;
+impl<'a, T: CanRead> Iterator for ListAccessibleBackupGroups<'a, T> {
+ type Item = Result<BackupGroup<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
diff --git a/src/backup/verify.rs b/src/backup/verify.rs
index 6ef7e8eb..1ede08ea 100644
--- a/src/backup/verify.rs
+++ b/src/backup/verify.rs
@@ -5,6 +5,7 @@ use std::time::Instant;
use anyhow::{bail, format_err, Error};
use nix::dir::Dir;
+use pbs_datastore::chunk_store::{CanRead, CanWrite};
use tracing::{error, info};
use proxmox_sys::fs::lock_dir_noblock_shared;
@@ -25,16 +26,16 @@ use crate::backup::hierarchy::ListAccessibleBackupGroups;
/// A VerifyWorker encapsulates a task worker, datastore and information about which chunks have
/// already been verified or detected as corrupt.
-pub struct VerifyWorker {
+pub struct VerifyWorker<T> {
worker: Arc<dyn WorkerTaskContext>,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
verified_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
corrupt_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
}
-impl VerifyWorker {
+impl<T: CanWrite> VerifyWorker<T> {
/// Creates a new VerifyWorker for a given task worker and datastore.
- pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore>) -> Self {
+ pub fn new(worker: Arc<dyn WorkerTaskContext>, datastore: Arc<DataStore<T>>) -> Self {
Self {
worker,
datastore,
@@ -46,7 +47,7 @@ impl VerifyWorker {
}
}
-fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
+fn verify_blob<T: CanRead>(backup_dir: &BackupDir<T>, info: &FileInfo) -> Result<(), Error> {
let blob = backup_dir.load_blob(&info.filename)?;
let raw_size = blob.raw_size();
@@ -70,7 +71,7 @@ fn verify_blob(backup_dir: &BackupDir, info: &FileInfo) -> Result<(), Error> {
}
}
-fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
+fn rename_corrupted_chunk<T: CanWrite>(datastore: Arc<DataStore<T>>, digest: &[u8; 32]) {
let (path, digest_str) = datastore.chunk_path(digest);
let mut counter = 0;
@@ -97,8 +98,8 @@ fn rename_corrupted_chunk(datastore: Arc<DataStore>, digest: &[u8; 32]) {
};
}
-fn verify_index_chunks(
- verify_worker: &VerifyWorker,
+fn verify_index_chunks<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
index: Box<dyn IndexFile + Send>,
crypt_mode: CryptMode,
) -> Result<(), Error> {
@@ -238,9 +239,9 @@ fn verify_index_chunks(
Ok(())
}
-fn verify_fixed_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_fixed_index<T: CanWrite + Sync + Send + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -260,9 +261,9 @@ fn verify_fixed_index(
verify_index_chunks(verify_worker, Box::new(index), info.chunk_crypt_mode())
}
-fn verify_dynamic_index(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+fn verify_dynamic_index<T: CanWrite + Sync + Send + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
info: &FileInfo,
) -> Result<(), Error> {
let mut path = backup_dir.relative_path();
@@ -291,9 +292,9 @@ fn verify_dynamic_index(
/// - Ok(true) if verify is successful
/// - Ok(false) if there were verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_dir(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
) -> Result<bool, Error> {
@@ -328,9 +329,9 @@ pub fn verify_backup_dir(
}
/// See verify_backup_dir
-pub fn verify_backup_dir_with_lock(
- verify_worker: &VerifyWorker,
- backup_dir: &BackupDir,
+pub fn verify_backup_dir_with_lock<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ backup_dir: &BackupDir<T>,
upid: UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
_snap_lock: Dir,
@@ -415,9 +416,9 @@ pub fn verify_backup_dir_with_lock(
/// Returns
/// - Ok((count, failed_dirs)) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_backup_group(
- verify_worker: &VerifyWorker,
- group: &BackupGroup,
+pub fn verify_backup_group<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
+ group: &BackupGroup<T>,
progress: &mut StoreProgress,
upid: &UPID,
filter: Option<&dyn Fn(&BackupManifest) -> bool>,
@@ -467,8 +468,8 @@ pub fn verify_backup_group(
/// Returns
/// - Ok(failed_dirs) where failed_dirs had verification errors
/// - Err(_) if task was aborted
-pub fn verify_all_backups(
- verify_worker: &VerifyWorker,
+pub fn verify_all_backups<T: CanWrite + Send + Sync + 'static>(
+ verify_worker: &VerifyWorker<T>,
upid: &UPID,
ns: BackupNamespace,
max_depth: Option<usize>,
@@ -516,7 +517,7 @@ pub fn verify_all_backups(
.filter(|group| {
!(group.backup_type() == BackupType::Host && group.backup_id() == "benchmark")
})
- .collect::<Vec<BackupGroup>>(),
+ .collect::<Vec<BackupGroup<T>>>(),
Err(err) => {
info!("unable to list backups: {err}");
return Ok(errors);
diff --git a/src/server/gc_job.rs b/src/server/gc_job.rs
index 64835028..4892430c 100644
--- a/src/server/gc_job.rs
+++ b/src/server/gc_job.rs
@@ -4,15 +4,17 @@ use std::sync::Arc;
use tracing::info;
use pbs_api_types::Authid;
-use pbs_datastore::DataStore;
+use pbs_datastore::{chunk_store::CanWrite, DataStore};
use proxmox_rest_server::WorkerTask;
use crate::server::{jobstate::Job, send_gc_status};
/// Runs a garbage collection job.
-pub fn do_garbage_collection_job(
+pub fn do_garbage_collection_job<
+ T: CanWrite + Send + Sync + std::panic::RefUnwindSafe + 'static,
+>(
mut job: Job,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
auth_id: &Authid,
schedule: Option<String>,
to_stdout: bool,
diff --git a/src/server/prune_job.rs b/src/server/prune_job.rs
index 546c0bbd..9024a61f 100644
--- a/src/server/prune_job.rs
+++ b/src/server/prune_job.rs
@@ -1,6 +1,7 @@
use std::sync::Arc;
use anyhow::Error;
+use pbs_datastore::chunk_store::CanWrite;
use tracing::{info, warn};
use pbs_api_types::{
@@ -14,10 +15,10 @@ use proxmox_rest_server::WorkerTask;
use crate::backup::ListAccessibleBackupGroups;
use crate::server::jobstate::Job;
-pub fn prune_datastore(
+pub fn prune_datastore<T: CanWrite>(
auth_id: Authid,
prune_options: PruneJobOptions,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<T>>,
dry_run: bool,
) -> Result<(), Error> {
let store = &datastore.name();
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 41ab5e0e..a567c510 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -9,6 +9,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Error};
use http::StatusCode;
+use pbs_datastore::chunk_store::{CanWrite, Read as StoreRead, Write as StoreWrite};
use proxmox_human_byte::HumanByte;
use proxmox_router::HttpError;
use serde_json::json;
@@ -45,11 +46,11 @@ struct RemoteReader {
struct LocalReader {
_dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
path: PathBuf,
- datastore: Arc<DataStore>,
+ datastore: Arc<DataStore<StoreRead>>,
}
pub(crate) struct PullTarget {
- store: Arc<DataStore>,
+ store: Arc<DataStore<StoreWrite>>,
ns: BackupNamespace,
}
@@ -60,7 +61,7 @@ pub(crate) struct RemoteSource {
}
pub(crate) struct LocalSource {
- store: Arc<DataStore>,
+ store: Arc<DataStore<StoreRead>>,
ns: BackupNamespace,
}
@@ -571,9 +572,9 @@ impl PullParameters {
}
}
-async fn pull_index_chunks<I: IndexFile>(
+async fn pull_index_chunks<I: IndexFile, T: CanWrite + Send + Sync + 'static>(
chunk_reader: Arc<dyn AsyncReadChunk>,
- target: Arc<DataStore>,
+ target: Arc<DataStore<T>>,
index: I,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
@@ -696,9 +697,9 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
/// -- Verify tmp file checksum
/// - if archive is an index, pull referenced chunks
/// - Rename tmp file into real path
-async fn pull_single_archive<'a>(
+async fn pull_single_archive<'a, T: CanWrite + Send + Sync + 'static>(
reader: Arc<dyn PullReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
archive_info: &'a FileInfo,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
@@ -779,9 +780,9 @@ async fn pull_single_archive<'a>(
/// -- if file already exists, verify contents
/// -- if not, pull it from the remote
/// - Download log if not already existing
-async fn pull_snapshot<'a>(
+async fn pull_snapshot<'a, T: CanWrite + Send + Sync + 'static>(
reader: Arc<dyn PullReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
let mut pull_stats = PullStats::default();
@@ -890,9 +891,9 @@ async fn pull_snapshot<'a>(
///
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
-async fn pull_snapshot_from<'a>(
+async fn pull_snapshot_from<'a, T: CanWrite + Send + Sync + 'static>(
reader: Arc<dyn PullReader + 'a>,
- snapshot: &'a pbs_datastore::BackupDir,
+ snapshot: &'a pbs_datastore::BackupDir<T>,
downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
) -> Result<PullStats, Error> {
let (_path, is_new, _snap_lock) = snapshot
diff --git a/src/tape/file_formats/snapshot_archive.rs b/src/tape/file_formats/snapshot_archive.rs
index f5a588f4..526c69ef 100644
--- a/src/tape/file_formats/snapshot_archive.rs
+++ b/src/tape/file_formats/snapshot_archive.rs
@@ -2,6 +2,7 @@ use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
+use pbs_datastore::chunk_store::CanRead;
use proxmox_sys::error::SysError;
use proxmox_uuid::Uuid;
@@ -21,9 +22,9 @@ use crate::tape::file_formats::{
/// `LEOM` was detected before all data was written. The stream is
/// marked inclomplete in that case and does not contain all data (The
/// backup task must rewrite the whole file on the next media).
-pub fn tape_write_snapshot_archive<'a>(
+pub fn tape_write_snapshot_archive<'a, T: CanRead>(
writer: &mut (dyn TapeWrite + 'a),
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<Option<Uuid>, std::io::Error> {
let backup_dir = snapshot_reader.snapshot();
let snapshot =
diff --git a/src/tape/pool_writer/mod.rs b/src/tape/pool_writer/mod.rs
index 9731e1cc..54a8715e 100644
--- a/src/tape/pool_writer/mod.rs
+++ b/src/tape/pool_writer/mod.rs
@@ -3,6 +3,7 @@ pub use catalog_set::*;
mod new_chunks_iterator;
pub use new_chunks_iterator::*;
+use pbs_datastore::chunk_store::CanRead;
use std::collections::HashSet;
use std::fs::File;
@@ -445,9 +446,9 @@ impl PoolWriter {
/// archive is marked incomplete, and we do not use it. The caller
/// should mark the media as full and try again using another
/// media.
- pub fn append_snapshot_archive(
+ pub fn append_snapshot_archive<T: CanRead>(
&mut self,
- snapshot_reader: &SnapshotReader,
+ snapshot_reader: &SnapshotReader<T>,
) -> Result<(bool, usize), Error> {
let status = match self.status {
Some(ref mut status) => status,
@@ -536,10 +537,10 @@ impl PoolWriter {
Ok((leom, bytes_written))
}
- pub fn spawn_chunk_reader_thread(
+ pub fn spawn_chunk_reader_thread<T: CanRead + Send + Sync + 'static>(
&self,
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
) -> Result<(std::thread::JoinHandle<()>, NewChunksIterator), Error> {
NewChunksIterator::spawn(datastore, snapshot_reader, Arc::clone(&self.catalog_set))
}
diff --git a/src/tape/pool_writer/new_chunks_iterator.rs b/src/tape/pool_writer/new_chunks_iterator.rs
index 1454b33d..b83ddf3e 100644
--- a/src/tape/pool_writer/new_chunks_iterator.rs
+++ b/src/tape/pool_writer/new_chunks_iterator.rs
@@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{format_err, Error};
+use pbs_datastore::chunk_store::CanRead;
use pbs_datastore::{DataBlob, DataStore, SnapshotReader};
use crate::tape::CatalogSet;
@@ -20,9 +21,9 @@ impl NewChunksIterator {
/// Creates the iterator, spawning a new thread
///
/// Make sure to join() the returned thread handle.
- pub fn spawn(
- datastore: Arc<DataStore>,
- snapshot_reader: Arc<Mutex<SnapshotReader>>,
+ pub fn spawn<T: CanRead + Send + Sync + 'static>(
+ datastore: Arc<DataStore<T>>,
+ snapshot_reader: Arc<Mutex<SnapshotReader<T>>>,
catalog_set: Arc<Mutex<CatalogSet>>,
) -> Result<(std::thread::JoinHandle<()>, Self), Error> {
let (tx, rx) = std::sync::mpsc::sync_channel(3);
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore
2024-09-03 12:33 [pbs-devel] [PATCH proxmox-backup RFC 00/10] introduce typestate for datastore/chunkstore Hannes Laimer
` (9 preceding siblings ...)
2024-09-03 12:34 ` [pbs-devel] [PATCH proxmox-backup RFC 10/10] backup/server/tape: " Hannes Laimer
@ 2024-09-04 7:34 ` Wolfgang Bumiller
10 siblings, 0 replies; 12+ messages in thread
From: Wolfgang Bumiller @ 2024-09-04 7:34 UTC (permalink / raw)
To: Hannes Laimer; +Cc: pbs-devel
On Tue, Sep 03, 2024 at 02:33:51PM GMT, Hannes Laimer wrote:
> This patch series introduces two traits, CanRead and CanWrite, to define whether
> a datastore reference is readable, writable, or neither. Functions that read
> or write are now implemented in `impl<T: CanRead>` or `impl<T: CanWrite>` blocks, ensuring
> that they are only available to references that are supposed to read/write.
>
> Motivation:
> Currently, we track the number of read/write references of a datastore but we don't
> track Lookup operations as they don't read or write, they still need a chunkstore, so
> eventhough they don't neccessarily directly do IO, they hold an open file handle.
> This is a problem for things like unmounting, currently lookup operations are only really
> short, so you'd need really unlucky timing to actually run into problems, but still,
> if a datastore is in "offline" maintenance mode, we shouldn't open filehandles on it.
>
> By encoding state in the type:
> 1. We can assign non-readable/writable references for lookup operations.
> 2. The compiler ensures correct usage of references. Since it is easy to miss
> what might happen a few function calls down the line, having the compiler
> yell at you for easily missed things like this, is a really good thing
> I think.
>
> Changes:
> * Added CanRead and CanWrite traits.
> * Separated functions into impl<T: CanRead> or impl<T: CanWrite>.
> * Introduced three new datastore lookup functions that return concrete types implementing
> CanRead, CanWrite, or neither.
> * Renamed lookup_datastore() to open_datastore() and made it private.
>
> The main downside is needing separate datastore caches for read and write references due to
> concrete type requirements in the cache HashMap.
>
> Almost all changes are either adding generics or moving functions into the appropriate
> trait implementations. The logic itself is only touched twice, once in datastore_lookup()
> and once check_privs_and_load_store() in /api/admin/datastore, this function now only checks
> the privs, the datastore opening happens in the endpoint function directly.
So apart from some details (like sealing the marker traits and some
whitespace issues between the patches etc.), I'd like to get some
generic feedback from the others here.
The main fear I'm having here is that it might increase codegen time,
but IMO there are a bunch of methods where it should be easy to manually
monomorphise the actual logic by just wrapping the actual logic in a
simple `fn()` right inside the method body.
While this is definitely additional work, keep in mind that most of
*this* patch set is just moving code between different impl<> blocks, so
the changes aren't as huge as they appear.
Additionally, I wouldn't consider having to separate the cache into read
and write caches a downside.
Note: The locking: the process locker - which is one current source of
"dangerous uses" - has a FIXME comment about switching to `OFD` locks
once they are available in the `nix` crate - which they are already
(also, could've just used libc back then?). This might give us a chance
to make the locking generally less error prone as well, but will need
more detailed analysis. If we *can* make that switch, then eg. creating
another `ChunkStore` instance would cease to be dangerous.
All this to say: I do like this change. I was at first a bit shocked,
that the marker traits ended up seeping through into even the
`BackupDir` & friends, but since they are *handles* to the dirs, and not
just the mere names, I think this is actually fine, and we get
additional safety from the compiler.
Some compile-time benchmarks before & after would be nice, though ;-)
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 12+ messages in thread