* [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses
@ 2026-05-08 15:03 Lukas Wagner
2026-05-08 15:03 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner
` (5 more replies)
0 siblings, 6 replies; 13+ messages in thread
From: Lukas Wagner @ 2026-05-08 15:03 UTC (permalink / raw)
To: pdm-devel
The main intention is to avoid a sprawl of different caching approaches by
establishing a simple, easy to use cache implementation that can be used to
persistently cache API responses from remotes (and derived aggregations).
Open questions:
- is the per-namespace lock too coarse? Should we rather lock
per key? Anyways, one should not hold the lock during longer periods of time
(e.g. when doing API requests), so the namespace-level lock seemed fine to me.
A per-namespace (so, per-remote) lock is nicer when one wants to update several
keys in one go.
- Base directory for cache, currently it is
/var/cache/proxmox-datacenter-manager/cache
But this seems both redundant and generic to me, so maybe
'api-cache'?
- Went with a max_age param on `get` instead of a `set` with an expiry time,
I think it's quite common to have cache readers with different requirements
to value freshness, so this might be a better fit.
Also, we use the max-age mechanism in the API already, so this
is a seamless fit then. Does this make sense? Or this we rather have
redis-style `set` with expiry time/TTL?
The `namespaced_cache` module is pretty generic and can be moved to proxmox.git
(maybe in proxmox-shared-cache) once it has sufficiently stabilized.
proxmox-datacenter-manager:
Lukas Wagner (4):
add persistent, generic, namespaced key-value cache implementation
add pdm_cache cache as a specialized wrapper around the namespaced
cache
api: resources: subscriptions: switch over to pdm_cache
remote-updates: switch over to pdm_cache
Cargo.toml | 1 +
server/Cargo.toml | 2 +
server/src/api/resources.rs | 82 ++---
.../bin/proxmox-datacenter-privileged-api.rs | 7 +
server/src/lib.rs | 2 +
server/src/namespaced_cache.rs | 324 ++++++++++++++++++
server/src/pdm_cache.rs | 69 ++++
server/src/remote_updates.rs | 52 +--
8 files changed, 452 insertions(+), 87 deletions(-)
create mode 100644 server/src/namespaced_cache.rs
create mode 100644 server/src/pdm_cache.rs
Summary over all repositories:
8 files changed, 452 insertions(+), 87 deletions(-)
--
Generated by murpp 0.12.0
^ permalink raw reply [flat|nested] 13+ messages in thread* [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner @ 2026-05-08 15:03 ` Lukas Wagner 2026-05-12 12:29 ` Wolfgang Bumiller 2026-05-08 15:03 ` [PATCH datacenter-manager 2/4] add pdm_cache cache as a specialized wrapper around the namespaced cache Lukas Wagner ` (4 subsequent siblings) 5 siblings, 1 reply; 13+ messages in thread From: Lukas Wagner @ 2026-05-08 15:03 UTC (permalink / raw) To: pdm-devel Namespaces map to directories inside a base directory. Cache contents are stored as a single JSON-file per key inside the namespace directory. Value expiry is implemented via a max_age parameter when retrieving values. Callers might have different requirements to the freshness of entries, so this seemed a better fit than statically setting an expiry time when *setting* the value. The current implementation is pretty naive about namespace names and key names; these must come from verified, path-safe identifiers, as they are used directly as components of the path of the resulting directories and JSON files. This should be fixed before using this implementation elsewhere. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- Notes: This module might be well suited to be moved to proxmox.git when it has stabilized enough. Kept here for now to ease development. Cargo.toml | 1 + server/Cargo.toml | 2 + server/src/lib.rs | 1 + server/src/namespaced_cache.rs | 324 +++++++++++++++++++++++++++++++++ 4 files changed, 328 insertions(+) create mode 100644 server/src/namespaced_cache.rs diff --git a/Cargo.toml b/Cargo.toml index 9806a4f0..5cf05b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.1" serde_json = "1.0" serde_plain = "1" +tempfile = "3.15" thiserror = "1.0" tokio = "1.6" tracing = "0.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3f185bbc..6d6dd583 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -29,6 +29,8 @@ openssl.workspace = true percent-encoding.workspace = true serde.workspace = true serde_json.workspace = true +tempfile.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] } tracing.workspace = true url.workspace = true diff --git a/server/src/lib.rs b/server/src/lib.rs index 5ed10d69..0b7642ab 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,6 +7,7 @@ pub mod context; pub mod env; pub mod jobstate; pub mod metric_collection; +pub mod namespaced_cache; pub mod parallel_fetcher; pub mod remote_cache; pub mod remote_tasks; diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs new file mode 100644 index 00000000..89ed5b7f --- /dev/null +++ b/server/src/namespaced_cache.rs @@ -0,0 +1,324 @@ +//! Generic namespaced cache implementation with optional value expiry. +//! +//! NOTE: +//! The current implementation is pretty naive about namespace names and key +//! names; these must come from verified, path-safe identifiers, as they are +//! used directly as components of the path of the resulting directories and +//! JSON files. This should be fixed before using this implementation +//! elsewhere. + +use std::fs::File; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use proxmox_sys::fs::CreateOptions; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +#[derive(thiserror::Error, Debug)] +pub enum CacheError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("error: {0}")] + Other(#[from] anyhow::Error), +} + +#[derive(Serialize, Deserialize, Debug)] +struct CacheEntry<T> { + timestamp: i64, + value: T, +} + +impl<T> CacheEntry<T> { + fn is_expired(&self, now: i64, max_age: i64) -> bool { + if max_age == 0 { + return true; + } + + let diff = now - self.timestamp; + diff >= max_age || diff < 0 + } +} + +pub struct NamespacedCache { + base_path: PathBuf, + file_options: CreateOptions, + dir_options: CreateOptions, +} + +impl NamespacedCache { + /// Create a new cache instance. + pub fn new<P: AsRef<Path>>( + base_path: P, + dir_options: CreateOptions, + file_options: CreateOptions, + ) -> Self { + Self { + base_path: base_path.as_ref().into(), + dir_options, + file_options, + } + } + + /// Lock a namespace for writing. + pub fn write( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<WritableCacheNamespace, CacheError> { + let path = get_lockfile(&self.base_path, namespace); + + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, true, self.file_options)?; + + Ok(WritableCacheNamespace { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_path.clone(), + dir_options: self.dir_options, + file_options: self.file_options, + }) + } + + /// Lock a namespace for reading. + pub fn read( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<ReadableCacheNamespace, CacheError> { + let path = get_lockfile(&self.base_path, namespace); + + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, false, self.file_options)?; + + Ok(ReadableCacheNamespace { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_path.clone(), + }) + } +} + +/// A readable cache namespace. +pub struct ReadableCacheNamespace { + _lock: File, + namespace: String, + base_path: PathBuf, +} + +impl ReadableCacheNamespace { + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { + get_impl(&self.base_path, &self.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) + } +} + +/// A writable cache namespace. +pub struct WritableCacheNamespace { + _lock: File, + namespace: String, + base_path: PathBuf, + dir_options: CreateOptions, + file_options: CreateOptions, +} + +impl WritableCacheNamespace { + /// Remote a cache entry. + /// + /// This returns `Ok(())` if the key does not exist. + /// + /// # Errors: + /// - The file could not be deleted due to insufficient privileges. + pub fn remove(&self, key: &str) -> Result<(), CacheError> { + let path = get_path(&self.base_path, &self.namespace, key); + + if let Err(err) = std::fs::remove_file(path) { + if err.kind() == ErrorKind::NotFound { + return Ok(()); + } + + return Err(err.into()); + } + + Ok(()) + } + + /// Set a cache entry. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set<T: Serialize + DeserializeOwned>( + &self, + key: &str, + entry: &T, + ) -> Result<(), CacheError> { + self.set_with_timestamp(key, entry, proxmox_time::epoch_i64()) + } + + /// Set a cache entry with an explicitly provided timestamp. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set_with_timestamp<T: Serialize + DeserializeOwned>( + &self, + key: &str, + value: &T, + timestamp: i64, + ) -> Result<(), CacheError> { + let path = get_path(&self.base_path, &self.namespace, key); + + proxmox_sys::fs::create_path( + path.parent().unwrap(), + Some(self.dir_options), + Some(self.dir_options), + )?; + + let entry = CacheEntry { timestamp, value }; + + let data = serde_json::to_vec(&entry)?; + proxmox_sys::fs::replace_file(path, &data, self.file_options, true)?; + + Ok(()) + } + + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { + get_impl(&self.base_path, &self.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) + } +} + +fn get_impl<T: Serialize + DeserializeOwned>( + base: &Path, + namespace: &str, + key: &str, + max_age: Option<i64>, +) -> Result<Option<T>, CacheError> { + let path = get_path(base, namespace, key); + let content = proxmox_sys::fs::file_read_optional_string(path)?; + + if let Some(content) = content { + let val = serde_json::from_str::<CacheEntry<T>>(&content)?; + + if let Some(max_age) = max_age { + if val.is_expired(proxmox_time::epoch_i64(), max_age) { + return Ok(None); + } + } + return Ok(Some(val.value)); + } + + return Ok(None); +} + +fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf { + let mut path = base.join(namespace).join(key); + path.set_extension("json"); + path +} + +fn get_lockfile(base: &Path, namespace: &str) -> PathBuf { + let path = base.join(format!(".{namespace}.lock")); + path +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + + fn make_cache() -> (TempDir, NamespacedCache) { + let dir = tempfile::tempdir().unwrap(); + + let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new()); + + (dir, cache) + } + + #[test] + fn test_cache() { + let (_dir, cache) = make_cache(); + + let write_guard = cache.write("remote-a", Duration::from_secs(1)).unwrap(); + write_guard.set("val1", &1).unwrap(); + write_guard.set("val2", &1).unwrap(); + + assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1); + + write_guard.remove("val1").unwrap(); + assert!(write_guard.get::<String>("val1").unwrap().is_none()); + + drop(write_guard); + + let read_guard = cache.read("remote-a", Duration::from_secs(1)).unwrap(); + + assert_eq!(read_guard.get::<i32>("val2").unwrap().unwrap(), 1); + } + + #[test] + fn test_delete_nonexisting() { + let (_dir, cache) = make_cache(); + + let a = cache.write("remote-a", Duration::from_secs(1)).unwrap(); + a.set("val", &1).unwrap(); + + // Deleting a key that does not exist is okay and should not error. + assert!(a.remove("val").is_ok()); + } + + #[test] + fn test_expiration() { + let entry = CacheEntry { + value: (), + timestamp: 1000, + }; + + assert!(!entry.is_expired(1000, 100)); + assert!(!entry.is_expired(1099, 100)); + assert!(entry.is_expired(1100, 100)); + assert!(entry.is_expired(1101, 100)); + + // if max-age is 0, the entry is never fresh + assert!(entry.is_expired(1000, 0)); + } +} -- 2.47.3 ^ permalink raw reply related [flat|nested] 13+ messages in thread
* Re: [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-08 15:03 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner @ 2026-05-12 12:29 ` Wolfgang Bumiller 2026-05-13 8:45 ` Lukas Wagner 0 siblings, 1 reply; 13+ messages in thread From: Wolfgang Bumiller @ 2026-05-12 12:29 UTC (permalink / raw) To: Lukas Wagner; +Cc: pdm-devel On Fri, May 08, 2026 at 05:03:27PM +0200, Lukas Wagner wrote: > Namespaces map to directories inside a base directory. Cache contents > are stored as a single JSON-file per key inside the namespace directory. > > Value expiry is implemented via a max_age parameter when retrieving > values. Callers might have different requirements to the freshness of > entries, so this seemed a better fit than statically setting an expiry > time when *setting* the value. > > The current implementation is pretty naive about namespace names and key > names; these must come from verified, path-safe identifiers, as they are > used directly as components of the path of the resulting directories and > JSON files. This should be fixed before using this implementation > elsewhere. > > Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> > --- > > Notes: > This module might be well suited to be moved to proxmox.git when it has > stabilized enough. Kept here for now to ease development. > > Cargo.toml | 1 + > server/Cargo.toml | 2 + > server/src/lib.rs | 1 + > server/src/namespaced_cache.rs | 324 +++++++++++++++++++++++++++++++++ > 4 files changed, 328 insertions(+) > create mode 100644 server/src/namespaced_cache.rs > > diff --git a/Cargo.toml b/Cargo.toml > index 9806a4f0..5cf05b41 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] } > serde_cbor = "0.11.1" > serde_json = "1.0" > serde_plain = "1" > +tempfile = "3.15" > thiserror = "1.0" > tokio = "1.6" > tracing = "0.1" > diff --git a/server/Cargo.toml b/server/Cargo.toml > index 3f185bbc..6d6dd583 100644 > --- a/server/Cargo.toml > +++ b/server/Cargo.toml > @@ -29,6 +29,8 @@ openssl.workspace = true > percent-encoding.workspace = true > serde.workspace = true > serde_json.workspace = true > +tempfile.workspace = true > +thiserror.workspace = true > tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] } > tracing.workspace = true > url.workspace = true > diff --git a/server/src/lib.rs b/server/src/lib.rs > index 5ed10d69..0b7642ab 100644 > --- a/server/src/lib.rs > +++ b/server/src/lib.rs > @@ -7,6 +7,7 @@ pub mod context; > pub mod env; > pub mod jobstate; > pub mod metric_collection; > +pub mod namespaced_cache; > pub mod parallel_fetcher; > pub mod remote_cache; > pub mod remote_tasks; > diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs > new file mode 100644 > index 00000000..89ed5b7f > --- /dev/null > +++ b/server/src/namespaced_cache.rs > @@ -0,0 +1,324 @@ > +//! Generic namespaced cache implementation with optional value expiry. > +//! > +//! NOTE: > +//! The current implementation is pretty naive about namespace names and key > +//! names; these must come from verified, path-safe identifiers, as they are > +//! used directly as components of the path of the resulting directories and > +//! JSON files. This should be fixed before using this implementation > +//! elsewhere. > + > +use std::fs::File; > +use std::io::ErrorKind; > +use std::path::{Path, PathBuf}; > +use std::time::Duration; > + > +use proxmox_sys::fs::CreateOptions; > +use serde::{de::DeserializeOwned, Deserialize, Serialize}; > + > +#[derive(thiserror::Error, Debug)] > +pub enum CacheError { > + #[error("IO error: {0}")] > + Io(#[from] std::io::Error), > + > + #[error("serialization error: {0}")] > + Serde(#[from] serde_json::Error), > + > + #[error("error: {0}")] > + Other(#[from] anyhow::Error), > +} > + > +#[derive(Serialize, Deserialize, Debug)] > +struct CacheEntry<T> { > + timestamp: i64, > + value: T, > +} > + > +impl<T> CacheEntry<T> { > + fn is_expired(&self, now: i64, max_age: i64) -> bool { > + if max_age == 0 { > + return true; > + } > + > + let diff = now - self.timestamp; > + diff >= max_age || diff < 0 > + } > +} > + > +pub struct NamespacedCache { > + base_path: PathBuf, > + file_options: CreateOptions, > + dir_options: CreateOptions, > +} > + > +impl NamespacedCache { > + /// Create a new cache instance. > + pub fn new<P: AsRef<Path>>( Consider `Into<PathBuf>` - since you force a clone on `PathBuf` now. > + base_path: P, > + dir_options: CreateOptions, > + file_options: CreateOptions, > + ) -> Self { > + Self { > + base_path: base_path.as_ref().into(), > + dir_options, > + file_options, > + } > + } > + > + /// Lock a namespace for writing. > + pub fn write( > + &self, > + namespace: &str, > + timeout: Duration, > + ) -> Result<WritableCacheNamespace, CacheError> { > + let path = get_lockfile(&self.base_path, namespace); > + > + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, true, self.file_options)?; > + > + Ok(WritableCacheNamespace { > + _lock: lock, > + namespace: namespace.to_string(), > + base_path: self.base_path.clone(), > + dir_options: self.dir_options, > + file_options: self.file_options, > + }) > + } > + > + /// Lock a namespace for reading. > + pub fn read( > + &self, > + namespace: &str, > + timeout: Duration, > + ) -> Result<ReadableCacheNamespace, CacheError> { > + let path = get_lockfile(&self.base_path, namespace); > + > + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, false, self.file_options)?; > + > + Ok(ReadableCacheNamespace { > + _lock: lock, > + namespace: namespace.to_string(), > + base_path: self.base_path.clone(), > + }) > + } > +} > + > +/// A readable cache namespace. > +pub struct ReadableCacheNamespace { > + _lock: File, > + namespace: String, > + base_path: PathBuf, > +} > + > +impl ReadableCacheNamespace { > + /// Read a value from the cache. > + /// > + /// # Errors: > + /// - The file associated with this key could not be read > + /// - The file could not be deserialized (e.g. invalid format) > + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { > + get_impl(&self.base_path, &self.namespace, key, None) > + } > + > + /// Read a value from the cache, given a maximum age of the cache entry. > + /// > + /// # Errors: > + /// - The file associated with this key could not be read > + /// - The file could not be deserialized (e.g. invalid format) > + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( > + &self, > + key: &str, > + max_age: i64, > + ) -> Result<Option<T>, CacheError> { > + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) > + } > +} > + > +/// A writable cache namespace. > +pub struct WritableCacheNamespace { > + _lock: File, > + namespace: String, > + base_path: PathBuf, > + dir_options: CreateOptions, > + file_options: CreateOptions, > +} > + > +impl WritableCacheNamespace { > + /// Remote a cache entry. > + /// > + /// This returns `Ok(())` if the key does not exist. > + /// > + /// # Errors: > + /// - The file could not be deleted due to insufficient privileges. > + pub fn remove(&self, key: &str) -> Result<(), CacheError> { > + let path = get_path(&self.base_path, &self.namespace, key); > + > + if let Err(err) = std::fs::remove_file(path) { > + if err.kind() == ErrorKind::NotFound { > + return Ok(()); > + } > + > + return Err(err.into()); > + } > + > + Ok(()) > + } > + > + /// Set a cache entry. > + /// > + /// # Errors > + /// - `value` could not be serialized > + /// - The namespace directory could not be created > + /// - The cache file could not be written to or atomically replaced > + pub fn set<T: Serialize + DeserializeOwned>( > + &self, > + key: &str, > + entry: &T, > + ) -> Result<(), CacheError> { > + self.set_with_timestamp(key, entry, proxmox_time::epoch_i64()) > + } > + > + /// Set a cache entry with an explicitly provided timestamp. > + /// > + /// # Errors > + /// - `value` could not be serialized > + /// - The namespace directory could not be created > + /// - The cache file could not be written to or atomically replaced > + pub fn set_with_timestamp<T: Serialize + DeserializeOwned>( > + &self, > + key: &str, > + value: &T, > + timestamp: i64, > + ) -> Result<(), CacheError> { > + let path = get_path(&self.base_path, &self.namespace, key); > + > + proxmox_sys::fs::create_path( > + path.parent().unwrap(), > + Some(self.dir_options), > + Some(self.dir_options), > + )?; > + > + let entry = CacheEntry { timestamp, value }; > + > + let data = serde_json::to_vec(&entry)?; > + proxmox_sys::fs::replace_file(path, &data, self.file_options, true)?; > + > + Ok(()) > + } > + > + /// Read a value from the cache. > + /// > + /// # Errors: > + /// - The file associated with this key could not be read > + /// - The file could not be deserialized (e.g. invalid format) > + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { > + get_impl(&self.base_path, &self.namespace, key, None) > + } > + > + /// Read a value from the cache, given a maximum age of the cache entry. > + /// > + /// # Errors: > + /// - The file associated with this key could not be read > + /// - The file could not be deserialized (e.g. invalid format) > + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( > + &self, > + key: &str, > + max_age: i64, > + ) -> Result<Option<T>, CacheError> { > + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) > + } > +} > + > +fn get_impl<T: Serialize + DeserializeOwned>( > + base: &Path, > + namespace: &str, > + key: &str, > + max_age: Option<i64>, > +) -> Result<Option<T>, CacheError> { > + let path = get_path(base, namespace, key); > + let content = proxmox_sys::fs::file_read_optional_string(path)?; > + > + if let Some(content) = content { > + let val = serde_json::from_str::<CacheEntry<T>>(&content)?; > + > + if let Some(max_age) = max_age { > + if val.is_expired(proxmox_time::epoch_i64(), max_age) { > + return Ok(None); > + } > + } > + return Ok(Some(val.value)); > + } > + > + return Ok(None); > +} > + > +fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf { > + let mut path = base.join(namespace).join(key); > + path.set_extension("json"); ^ I'd rather join base with `format!("{key}.json")`, otherwise if a `key` contains dots, it'll lose its final component. Alternatively we could just skip the extension altogether. Ah and I think we might want to `panic` (or fail) if `namespace.starts_with('/')`, that would cause discard `base` to be discarded! (I'm leaning towards panicking since this isn't something that should ever come from the outside (I hope…), at least currently it's all `const`s…) > + path > +} > + > +fn get_lockfile(base: &Path, namespace: &str) -> PathBuf { > + let path = base.join(format!(".{namespace}.lock")); > + path > +} > + > +#[cfg(test)] > +mod tests { > + use tempfile::TempDir; > + > + use super::*; > + > + fn make_cache() -> (TempDir, NamespacedCache) { > + let dir = tempfile::tempdir().unwrap(); (I wonder if we should use `tempdir_in` and pass `$OUT_DIR` or some other dir cargo provides via env vars for these things?) > + > + let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new()); > + > + (dir, cache) > + } > + > + #[test] > + fn test_cache() { > + let (_dir, cache) = make_cache(); > + > + let write_guard = cache.write("remote-a", Duration::from_secs(1)).unwrap(); > + write_guard.set("val1", &1).unwrap(); > + write_guard.set("val2", &1).unwrap(); > + > + assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1); > + > + write_guard.remove("val1").unwrap(); > + assert!(write_guard.get::<String>("val1").unwrap().is_none()); > + > + drop(write_guard); > + > + let read_guard = cache.read("remote-a", Duration::from_secs(1)).unwrap(); > + > + assert_eq!(read_guard.get::<i32>("val2").unwrap().unwrap(), 1); > + } > + > + #[test] > + fn test_delete_nonexisting() { > + let (_dir, cache) = make_cache(); > + > + let a = cache.write("remote-a", Duration::from_secs(1)).unwrap(); > + a.set("val", &1).unwrap(); > + > + // Deleting a key that does not exist is okay and should not error. > + assert!(a.remove("val").is_ok()); > + } > + > + #[test] > + fn test_expiration() { > + let entry = CacheEntry { > + value: (), > + timestamp: 1000, > + }; > + > + assert!(!entry.is_expired(1000, 100)); > + assert!(!entry.is_expired(1099, 100)); > + assert!(entry.is_expired(1100, 100)); > + assert!(entry.is_expired(1101, 100)); > + > + // if max-age is 0, the entry is never fresh > + assert!(entry.is_expired(1000, 0)); > + } > +} > -- > 2.47.3 ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-12 12:29 ` Wolfgang Bumiller @ 2026-05-13 8:45 ` Lukas Wagner 0 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-13 8:45 UTC (permalink / raw) To: Wolfgang Bumiller, Lukas Wagner; +Cc: pdm-devel On Tue May 12, 2026 at 2:29 PM CEST, Wolfgang Bumiller wrote: >> + >> +impl<T> CacheEntry<T> { >> + fn is_expired(&self, now: i64, max_age: i64) -> bool { >> + if max_age == 0 { >> + return true; >> + } >> + >> + let diff = now - self.timestamp; >> + diff >= max_age || diff < 0 >> + } >> +} >> + >> +pub struct NamespacedCache { >> + base_path: PathBuf, >> + file_options: CreateOptions, >> + dir_options: CreateOptions, >> +} >> + >> +impl NamespacedCache { >> + /// Create a new cache instance. >> + pub fn new<P: AsRef<Path>>( > > Consider `Into<PathBuf>` - since you force a clone on `PathBuf` now. > Done, thanks! >> + base_path: P, >> + dir_options: CreateOptions, >> + file_options: CreateOptions, >> + ) -> Self { >> + Self { >> + base_path: base_path.as_ref().into(), >> + dir_options, >> + file_options, >> + } >> + } >> + [...] >> + >> +fn get_impl<T: Serialize + DeserializeOwned>( >> + base: &Path, >> + namespace: &str, >> + key: &str, >> + max_age: Option<i64>, >> +) -> Result<Option<T>, CacheError> { >> + let path = get_path(base, namespace, key); >> + let content = proxmox_sys::fs::file_read_optional_string(path)?; >> + >> + if let Some(content) = content { >> + let val = serde_json::from_str::<CacheEntry<T>>(&content)?; >> + >> + if let Some(max_age) = max_age { >> + if val.is_expired(proxmox_time::epoch_i64(), max_age) { >> + return Ok(None); >> + } >> + } >> + return Ok(Some(val.value)); >> + } >> + >> + return Ok(None); >> +} >> + >> +fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf { >> + let mut path = base.join(namespace).join(key); >> + path.set_extension("json"); > > ^ I'd rather join base with `format!("{key}.json")`, otherwise if a > `key` contains dots, it'll lose its final component. Did that for the next revision, thanks! > > Alternatively we could just skip the extension altogether. > > Ah and I think we might want to `panic` (or fail) if > `namespace.starts_with('/')`, that would cause discard `base` to be > discarded! (I'm leaning towards panicking since this isn't something > that should ever come from the outside (I hope…), at least currently > it's all `const`s…) Added, thanks! I went with a failure, since I added some basic checks on keys and namespaces anyway (e.g. prohibiting '../'). I added new error variants InvalidNamespace and InvalidKey for this. > >> + path >> +} >> + >> +fn get_lockfile(base: &Path, namespace: &str) -> PathBuf { >> + let path = base.join(format!(".{namespace}.lock")); >> + path >> +} >> + >> +#[cfg(test)] >> +mod tests { >> + use tempfile::TempDir; >> + >> + use super::*; >> + >> + fn make_cache() -> (TempDir, NamespacedCache) { >> + let dir = tempfile::tempdir().unwrap(); > > (I wonder if we should use `tempdir_in` and pass `$OUT_DIR` or some > other dir cargo provides via env vars for these things?) Unfortunately, cargo does not seem to set OUT_DIR when `cargo test` is used. According to [1], there is CARGO_TARGET_TMPDIR, but that one is only set for *integration* tests, not *unit* tests. Once we put this cache impl into a shared crate, I'd likely promote the test cases to be integration tests anyway, then we can consider switching the directory. For now, I'll just keep on using `tmpdir`. [1] https://doc.rust-lang.org/cargo/reference/environment-variables.html ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH datacenter-manager 2/4] add pdm_cache cache as a specialized wrapper around the namespaced cache 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner @ 2026-05-08 15:03 ` Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 3/4] api: resources: subscriptions: switch over to pdm_cache Lukas Wagner ` (3 subsequent siblings) 5 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-08 15:03 UTC (permalink / raw) To: pdm-devel This is a thin wrapper around the previously introduced namespaced key-value cache, but introducing PDM-specific concepts. Instead of the higher-level read/write methods for locking a namespace, this wrapper provides {read,write}_remote and {read,write}_global, for accessing remote-specific and globally cached values. The cache-namespaces are 'global' and 'remote-<remote-name>'. The base directory for the cache is /var/cache/proxmox-datacenter-manager/cache Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- Notes: Not sure about the base directory /var/cache/proxmox-datacenter-manager/cache Maybe 'api-cache' could be a nicer fit, since realistically, we probably only ever cache API responses and aggregations thereof? .../bin/proxmox-datacenter-privileged-api.rs | 7 ++ server/src/lib.rs | 1 + server/src/pdm_cache.rs | 69 +++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 server/src/pdm_cache.rs diff --git a/server/src/bin/proxmox-datacenter-privileged-api.rs b/server/src/bin/proxmox-datacenter-privileged-api.rs index 6b490f2b..6e8ba611 100644 --- a/server/src/bin/proxmox-datacenter-privileged-api.rs +++ b/server/src/bin/proxmox-datacenter-privileged-api.rs @@ -102,6 +102,13 @@ fn create_directories() -> Result<(), Error> { 0o755, )?; + pdm_config::setup::mkdir_perms( + concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/cache"), + api_user.uid, + api_user.gid, + 0o755, + )?; + server::jobstate::create_jobstate_dir()?; Ok(()) diff --git a/server/src/lib.rs b/server/src/lib.rs index 0b7642ab..5e8c0b64 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -9,6 +9,7 @@ pub mod jobstate; pub mod metric_collection; pub mod namespaced_cache; pub mod parallel_fetcher; +pub mod pdm_cache; pub mod remote_cache; pub mod remote_tasks; pub mod remote_updates; diff --git a/server/src/pdm_cache.rs b/server/src/pdm_cache.rs new file mode 100644 index 00000000..a7370632 --- /dev/null +++ b/server/src/pdm_cache.rs @@ -0,0 +1,69 @@ +use std::{ + path::{Path, PathBuf}, + sync::LazyLock, + time::Duration, +}; + +use nix::sys::stat::Mode; +use proxmox_sys::fs::CreateOptions; + +use crate::namespaced_cache::{ + CacheError, NamespacedCache, ReadableCacheNamespace, WritableCacheNamespace, +}; + +static CACHE_INSTANCE: LazyLock<PdmCache> = LazyLock::new(|| { + let file_options = proxmox_product_config::default_create_options(); + let dir_options = file_options.perm(Mode::from_bits_truncate(0o750)); + + PdmCache::new( + // FIXME: `/cache` seems slightly redundant, come up with something else... + PathBuf::from(concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/cache")), + dir_options, + file_options, + ) +}); + +/// Return a handle to the global [`PdmCache`] instance. +pub fn instance() -> &'static PdmCache { + &CACHE_INSTANCE +} + +/// Cache for storing the results of API requests, as well as aggregations thereof. +pub struct PdmCache(NamespacedCache); + +impl PdmCache { + /// Create a new cache instance. + /// + /// # Note + /// Most likely, you want to access the single global instance via [`pdm_cache::instance()`] + /// instead of calling `new` yourself. + fn new<P: AsRef<Path>>( + base_path: P, + dir_options: CreateOptions, + file_options: CreateOptions, + ) -> Self { + Self(NamespacedCache::new(base_path, dir_options, file_options)) + } + + /// Lock the cache for reading remote-specific data. + pub fn read_remote(&self, remote: &str) -> Result<ReadableCacheNamespace, CacheError> { + let namespace = format!("remote-{remote}"); + self.0.read(&namespace, Duration::from_secs(10)) + } + + /// Lock the cache for writing remote-specific data. + pub fn write_remote(&self, remote: &str) -> Result<WritableCacheNamespace, CacheError> { + let namespace = format!("remote-{remote}"); + self.0.write(&namespace, Duration::from_secs(10)) + } + + /// Lock the cache for reading global data. + pub fn read_global(&self) -> Result<ReadableCacheNamespace, CacheError> { + self.0.read("global", Duration::from_secs(10)) + } + + /// Lock the cache for writing global data. + pub fn write_global(&self) -> Result<WritableCacheNamespace, CacheError> { + self.0.write("global", Duration::from_secs(10)) + } +} -- 2.47.3 ^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH datacenter-manager 3/4] api: resources: subscriptions: switch over to pdm_cache 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 2/4] add pdm_cache cache as a specialized wrapper around the namespaced cache Lukas Wagner @ 2026-05-08 15:03 ` Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 4/4] remote-updates: " Lukas Wagner ` (2 subsequent siblings) 5 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-08 15:03 UTC (permalink / raw) To: pdm-devel Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- server/src/api/resources.rs | 82 ++++++++++++++----------------------- 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/server/src/api/resources.rs b/server/src/api/resources.rs index 50315b11..c22e33c5 100644 --- a/server/src/api/resources.rs +++ b/server/src/api/resources.rs @@ -30,9 +30,10 @@ use proxmox_schema::{api, parse_boolean}; use proxmox_sortable_macro::sortable; use proxmox_subscription::SubscriptionStatus; use pve_api_types::{ClusterResource, ClusterResourceNetworkType, ClusterResourceType}; +use serde::{Deserialize, Serialize}; use crate::metric_collection::top_entities; -use crate::{connection, views}; +use crate::{connection, pdm_cache, views}; pub const ROUTER: Router = Router::new() .get(&list_subdirs_api_method!(SUBDIRS)) @@ -798,15 +799,11 @@ async fn get_top_entities( Ok(res) } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] struct CachedSubscriptionState { node_info: HashMap<String, Option<NodeSubscriptionInfo>>, - timestamp: i64, } -static SUBSCRIPTION_CACHE: LazyLock<RwLock<HashMap<String, CachedSubscriptionState>>> = - LazyLock::new(|| RwLock::new(HashMap::new())); - /// Get the subscription state for a given remote. /// /// If recent enough cached data is available, it is returned @@ -815,66 +812,49 @@ pub async fn get_subscription_info_for_remote( remote: &Remote, max_age: u64, ) -> Result<HashMap<String, Option<NodeSubscriptionInfo>>, Error> { - if let Some(cached_subscription) = get_cached_subscription_info(&remote.id, max_age) { + if let Some(cached_subscription) = + get_cached_subscription_info(remote.id.clone(), max_age).await? + { Ok(cached_subscription.node_info) } else { let node_info = fetch_remote_subscription_info(remote).await?; - let now = proxmox_time::epoch_i64(); - update_cached_subscription_info(&remote.id, &node_info, now); + update_cached_subscription_info(remote.id.clone(), node_info.clone()).await?; Ok(node_info) } } -fn get_cached_subscription_info(remote: &str, max_age: u64) -> Option<CachedSubscriptionState> { - let cache = SUBSCRIPTION_CACHE - .read() - .expect("subscription mutex poisoned"); +const SUBSCRIPTION_STATE_CACHE_KEY: &str = "subscription-state"; - if max_age == 0 { - return None; - } - if let Some(cached_subscription) = cache.get(remote) { - let now = proxmox_time::epoch_i64(); - let diff = now - cached_subscription.timestamp; +async fn get_cached_subscription_info( + remote: String, + max_age: u64, +) -> Result<Option<CachedSubscriptionState>, Error> { + tokio::task::spawn_blocking(move || { + let cache = pdm_cache::instance().read_remote(&remote)?; - if diff >= max_age as i64 || diff < 0 { - // value is too old or from the future - None - } else { - Some(cached_subscription.clone()) - } - } else { - None - } + Ok(cache.get_with_max_age(SUBSCRIPTION_STATE_CACHE_KEY, max_age as i64)?) + }) + .await? } /// Update cached subscription data. /// /// If the cache already contains more recent data we don't insert the passed resources. -fn update_cached_subscription_info( - remote: &str, - node_info: &HashMap<String, Option<NodeSubscriptionInfo>>, - now: i64, -) { - // there is no good way to recover from this, so panicking should be fine - let mut cache = SUBSCRIPTION_CACHE - .write() - .expect("subscription mutex poisoned"); +async fn update_cached_subscription_info( + remote: String, + node_info: HashMap<String, Option<NodeSubscriptionInfo>>, +) -> Result<(), Error> { + tokio::task::spawn_blocking(move || { + let cache = pdm_cache::instance().write_remote(&remote)?; - if let Some(cached_resource) = cache.get(remote) { - // skip updating if the data is new enough - if cached_resource.timestamp >= now { - return; - } - } - - cache.insert( - remote.into(), - CachedSubscriptionState { - node_info: node_info.clone(), - timestamp: now, - }, - ); + Ok(cache.set( + SUBSCRIPTION_STATE_CACHE_KEY, + &CachedSubscriptionState { + node_info: node_info, + }, + )?) + }) + .await? } /// Maps a list of node subscription infos into a single [`RemoteSubscriptionState`] -- 2.47.3 ^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH datacenter-manager 4/4] remote-updates: switch over to pdm_cache 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner ` (2 preceding siblings ...) 2026-05-08 15:03 ` [PATCH datacenter-manager 3/4] api: resources: subscriptions: switch over to pdm_cache Lukas Wagner @ 2026-05-08 15:03 ` Lukas Wagner 2026-05-11 12:37 ` [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Dominik Csapak 2026-05-13 13:56 ` superseded: " Lukas Wagner 5 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-08 15:03 UTC (permalink / raw) To: pdm-devel Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- Notes: Should probably add clean-up code for the old cache-file before this is applied server/src/remote_updates.rs | 52 +++++++++++------------------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs index 7aaacc46..e0f61da4 100644 --- a/server/src/remote_updates.rs +++ b/server/src/remote_updates.rs @@ -1,6 +1,3 @@ -use std::fs::File; -use std::io::ErrorKind; - use anyhow::{bail, Error}; use serde::{Deserialize, Serialize}; @@ -12,12 +9,11 @@ use pdm_api_types::remote_updates::{ }; use pdm_api_types::remotes::{Remote, RemoteType}; use pdm_api_types::RemoteUpid; -use pdm_buildcfg::PDM_CACHE_DIR_M; -use crate::connection; use crate::parallel_fetcher::ParallelFetcher; +use crate::{connection, pdm_cache}; -pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json"); +const UPDATE_SUMMARY_CACHE_KEY: &str = "remote-updates"; #[derive(Clone, Default, Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] @@ -157,21 +153,10 @@ pub fn get_available_updates_for_remote(remote: &str) -> Result<RemoteUpdateSumm } fn get_cached_summary_or_default() -> Result<UpdateSummary, Error> { - match File::open(UPDATE_CACHE) { - Ok(file) => { - let content = match serde_json::from_reader(file) { - Ok(cache_content) => cache_content, - Err(err) => { - log::error!("failed to deserialize remote update cache: {err:#}"); - Default::default() - } - }; - - Ok(content) - } - Err(err) if err.kind() == ErrorKind::NotFound => Ok(Default::default()), - Err(err) => Err(err.into()), - } + Ok(pdm_cache::instance() + .read_global()? + .get::<UpdateSummary>(UPDATE_SUMMARY_CACHE_KEY)? + .unwrap_or_default()) } async fn update_cached_summary_for_node( @@ -179,10 +164,11 @@ async fn update_cached_summary_for_node( node: String, node_data: NodeUpdateSummary, ) -> Result<(), Error> { - let mut file = File::open(UPDATE_CACHE)?; - let mut cache_content: UpdateSummary = serde_json::from_reader(&mut file)?; - let remote_entry = - cache_content + let cache = pdm_cache::instance().write_global()?; + let cache_content = cache.get::<UpdateSummary>(UPDATE_SUMMARY_CACHE_KEY)?; + + if let Some(mut entry) = cache_content { + let remote_entry = entry .remotes .entry(remote.id) .or_insert_with(|| RemoteUpdateSummary { @@ -191,15 +177,9 @@ async fn update_cached_summary_for_node( status: RemoteUpdateStatus::Success, }); - remote_entry.nodes.insert(node, node_data); - - let options = proxmox_product_config::default_create_options(); - proxmox_sys::fs::replace_file( - UPDATE_CACHE, - &serde_json::to_vec(&cache_content)?, - options, - true, - )?; + remote_entry.nodes.insert(node, node_data); + cache.set(UPDATE_SUMMARY_CACHE_KEY, &entry)?; + } Ok(()) } @@ -275,8 +255,8 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er } } - let options = proxmox_product_config::default_create_options(); - proxmox_sys::fs::replace_file(UPDATE_CACHE, &serde_json::to_vec(&content)?, options, true)?; + let cache = pdm_cache::instance().write_global()?; + cache.set(UPDATE_SUMMARY_CACHE_KEY, &content)?; Ok(()) } -- 2.47.3 ^ permalink raw reply related [flat|nested] 13+ messages in thread
* Re: [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner ` (3 preceding siblings ...) 2026-05-08 15:03 ` [PATCH datacenter-manager 4/4] remote-updates: " Lukas Wagner @ 2026-05-11 12:37 ` Dominik Csapak 2026-05-12 8:39 ` Lukas Wagner 2026-05-13 13:56 ` superseded: " Lukas Wagner 5 siblings, 1 reply; 13+ messages in thread From: Dominik Csapak @ 2026-05-11 12:37 UTC (permalink / raw) To: Lukas Wagner, pdm-devel hi, IMO the code looks mostly good to me, as well as the approach, a few comments regardless (all here, as the patches themselves are not that big ;) ) * using the filesystem can be good, it can reduce the memory footprint since we're not automatically store everything in memory this can also be a downside though, since now the IOPS requirements rise, and for frequent update cycles it could have a negative impact on the disks (ssd wearout) this could be mitigated by using e.g. /var/run instead of /var/cache but then it's back to using memory all the time, with less clear accounting. I'm actually not sure which is better, maybe having that configurable could make sense? (as in, whats the cache path? this way users can decide if they want it on disk or in memory) * currently you expire values if the timestamp is in the future, could it be better to return an error here? IMO there had to be one at some point, either the time was wrong in the past, or the time is wrong now (neither of which is good?) * not sure if the pdm_cache::instance() abstraction is gaining us anything here, we could simply have freestanding functions? * for the subscription info you chose to implement an async wrapper maybe we could/should have that directly in the interface (of pdm_cache for example) ? otherwise people might use it without a 'spawn_blocking'. having both an async and sync interface could be beneficial On 5/8/26 5:02 PM, Lukas Wagner wrote: > The main intention is to avoid a sprawl of different caching approaches by > establishing a simple, easy to use cache implementation that can be used to > persistently cache API responses from remotes (and derived aggregations). > > Open questions: > - is the per-namespace lock too coarse? Should we rather lock > per key? Anyways, one should not hold the lock during longer periods of time > (e.g. when doing API requests), so the namespace-level lock seemed fine to me. > A per-namespace (so, per-remote) lock is nicer when one wants to update several > keys in one go. > > - Base directory for cache, currently it is > /var/cache/proxmox-datacenter-manager/cache > But this seems both redundant and generic to me, so maybe > 'api-cache'? > > - Went with a max_age param on `get` instead of a `set` with an expiry time, > I think it's quite common to have cache readers with different requirements > to value freshness, so this might be a better fit. > Also, we use the max-age mechanism in the API already, so this > is a seamless fit then. Does this make sense? Or this we rather have > redis-style `set` with expiry time/TTL? > > > The `namespaced_cache` module is pretty generic and can be moved to proxmox.git > (maybe in proxmox-shared-cache) once it has sufficiently stabilized. > > > proxmox-datacenter-manager: > > Lukas Wagner (4): > add persistent, generic, namespaced key-value cache implementation > add pdm_cache cache as a specialized wrapper around the namespaced > cache > api: resources: subscriptions: switch over to pdm_cache > remote-updates: switch over to pdm_cache > > Cargo.toml | 1 + > server/Cargo.toml | 2 + > server/src/api/resources.rs | 82 ++--- > .../bin/proxmox-datacenter-privileged-api.rs | 7 + > server/src/lib.rs | 2 + > server/src/namespaced_cache.rs | 324 ++++++++++++++++++ > server/src/pdm_cache.rs | 69 ++++ > server/src/remote_updates.rs | 52 +-- > 8 files changed, 452 insertions(+), 87 deletions(-) > create mode 100644 server/src/namespaced_cache.rs > create mode 100644 server/src/pdm_cache.rs > > > Summary over all repositories: > 8 files changed, 452 insertions(+), 87 deletions(-) > ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses 2026-05-11 12:37 ` [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Dominik Csapak @ 2026-05-12 8:39 ` Lukas Wagner 0 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-12 8:39 UTC (permalink / raw) To: Dominik Csapak, Lukas Wagner, pdm-devel Hey Dominik, thanks for your feedback. On Mon May 11, 2026 at 2:37 PM CEST, Dominik Csapak wrote: > hi, > > IMO the code looks mostly good to me, as well as the approach, a > few comments regardless (all here, as the patches themselves are > not that big ;) ) > > * using the filesystem can be good, it can reduce the memory footprint > since we're not automatically store everything in memory > > this can also be a downside though, since now the IOPS requirements > rise, and for frequent update cycles it could have a negative impact > on the disks (ssd wearout) > > this could be mitigated by using e.g. /var/run instead of /var/cache > but then it's back to using memory all the time, with less > clear accounting. > > I'm actually not sure which is better, maybe having that configurable > could make sense? (as in, whats the cache path? this way users can > decide if they want it on disk or in memory) FWIW, I did some back-on-the-envelope calculations. I assume that the cache for the cluster resources would likely be the one entry with the most writes by far, since that one scales linearly with the number of resources *and* has to be refreshed periodically (at the moment, we have a max-age of 30 seconds for the dashboard). A guest resource (e.g. a VM) is roughly 260 bytes of minified JSON. If we assume a huge setup with, say, 50000 guests, that would be: 260 bytes * 50000 = 13 MBytes 13 MBytes of writes every 30 seconds boils down to 13 * 2 * 60 * 24 = ~37.5 GB per day of writes (without accounting for write amplification effects) A PDM host for huge setups like these would (hopefully) use fully-enterprise grade SSDs, so in reality these amounts of writes *should* be fine, I think. The average PDM setup is likely going be at least one order of magnitude smaller than this, I would assume. Also, it's worth mentioning that this is the worst-case scenario where the dashboard is opened 24/7, if that is not the case, we only request resource data every couple of minutes. That being said, since we only have 13 MB of data, we could, as you said, just put that into /var/run for now, and live with the data loss on reboot. > I'm actually not sure which is better, maybe having that configurable > could make sense? (as in, whats the cache path? this way users can > decide if they want it on disk or in memory) I think both options are fine. Storing it on disk would have the slight advantage that more expensive, rarely fetched API requests (e.g. update status, requesting a list of updates can, as of now, trigger an 'apt update' on PVE nodes) can survive reboots of the host. In practice, as reboots should hopefully be not that common, this really should not matter too much, I think. With regards to making it configurable: Yes, could make sense, but I'd only add this once we actually have users requesting it. I think I'd put it in /var/run for the next version, going from non-persistent to persistent seems to be the easier step, in case we want to change the location in the future. > > * currently you expire values if the timestamp is in the future, > could it be better to return an error here? IMO there had to be one > at some point, either the time was wrong in the past, or the time > is wrong now (neither of which is good?) The question is, do we want to put the burden on the caller to handle this error variant correctly? With the current implementation, a timestamp from the future would simply entail the caller to get a `None`, which in practice means that the caller will retrieve fresh data from the remote, which will then be put into the cache, overriding the entry with the bogus timestamp, resulting in a self-healing effect. If an error is returned, what would you do in the caller with it? Just bubble it up to the API level and return a failure? Or handle it in a way to heal the cache manually? > > * not sure if the pdm_cache::instance() abstraction is gaining us > anything here, we could simply have freestanding functions? I guess at the moment we do not gain anything here, true. I'm more leaning towards this style, since I'm always having my proposal from https://lore.proxmox.com/pdm-devel/20260129134418.307552-1-l.wagner@proxmox.com/T/#t in mind, on which I will hopefully start working on again once I have the time for it. This cache would also something that would need to be injected via the API handler via some application context object, to ensure that we can drive API handler code in integration tests. But since this is 100% pdm-internal code, I can just put it behind helpers for now and refactor it again once we actually need it. > > * for the subscription info you chose to implement an async wrapper > maybe we could/should have that directly in the interface > (of pdm_cache for example) ? otherwise people might use it > without a 'spawn_blocking'. having both an async and sync interface > could be beneficial > Yes, I wondered the same actually. Would you add the async interface to `pdm_cache`, or the generic implementation in `namespaced_cache`? If we do the former, we need to add wrappers for the read/write guard types as well, so that would be quite a bit more code. If we put it in `namespaced_cache`, we probably should feature-gate them as soon as we put the cache into a shared crate in proxmox.git ^ permalink raw reply [flat|nested] 13+ messages in thread
* superseded: [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner ` (4 preceding siblings ...) 2026-05-11 12:37 ` [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Dominik Csapak @ 2026-05-13 13:56 ` Lukas Wagner 5 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-13 13:56 UTC (permalink / raw) To: Lukas Wagner, pdm-devel On Fri May 8, 2026 at 5:03 PM CEST, Lukas Wagner wrote: > The main intention is to avoid a sprawl of different caching approaches by > establishing a simple, easy to use cache implementation that can be used to > persistently cache API responses from remotes (and derived aggregations). superseded by: https://lore.proxmox.com/all/20260513135457.573414-1-l.wagner@proxmox.com/T/#u ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses @ 2026-05-13 13:54 Lukas Wagner 2026-05-13 13:54 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner 0 siblings, 1 reply; 13+ messages in thread From: Lukas Wagner @ 2026-05-13 13:54 UTC (permalink / raw) To: pdm-devel The main intention is to avoid a sprawl of different caching approaches by establishing a simple, easy to use cache implementation that can be used to persistently cache API responses from remotes (and derived aggregations). The `namespaced_cache` module is pretty generic and can be moved to proxmox.git (maybe in proxmox-shared-cache) once it has sufficiently stabilized. Changes since the RFC: - change storage location to /run/proxmox-datacenter-manager/api-cache - change name of pdm_cache to api_cache - use freestanding functions in api_cache - add async interface - minor code style improments - improve test coverage and documentation - add basic sanity checks for namespaces and keys (e.g. prohibiting ../) - add cleanup code for the old remote-updates cachefile Thanks to Dominik C. and Wolfgang for their reviews of the RFC, highly appreciated! proxmox-datacenter-manager: Lukas Wagner (4): add persistent, generic, namespaced key-value cache implementation add api_cache as a specialized wrapper around the namespaced cache api: resources: subscriptions: switch over to api_cache remote-updates: switch over to new api_cache Cargo.toml | 1 + debian/control | 1 + server/Cargo.toml | 4 + server/src/api/pve/mod.rs | 4 +- server/src/api/remotes/updates.rs | 4 +- server/src/api/resources.rs | 81 +- server/src/api_cache.rs | 126 +++ .../bin/proxmox-datacenter-privileged-api.rs | 9 +- server/src/lib.rs | 2 + server/src/namespaced_cache.rs | 742 ++++++++++++++++++ server/src/remote_updates.rs | 87 +- 11 files changed, 962 insertions(+), 99 deletions(-) create mode 100644 server/src/api_cache.rs create mode 100644 server/src/namespaced_cache.rs Summary over all repositories: 11 files changed, 962 insertions(+), 99 deletions(-) -- Generated by murpp 0.12.0 ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-13 13:54 [PATCH " Lukas Wagner @ 2026-05-13 13:54 ` Lukas Wagner 2026-05-15 9:06 ` Thomas Lamprecht 0 siblings, 1 reply; 13+ messages in thread From: Lukas Wagner @ 2026-05-13 13:54 UTC (permalink / raw) To: pdm-devel Namespaces map to directories inside a base directory. Cache contents are stored as a single JSON-file per key inside the namespace directory. Value expiry is implemented via a max_age parameter when retrieving values. Callers might have different requirements to the freshness of entries, so this seemed a better fit than statically setting an expiry time when *setting* the value. The cache offers both a blocking as well as an async interface. Once we move this implementation to a shared crate, we should probably use feature flags to gate these. Signed-off-by: Lukas Wagner <l.wagner@proxmox.com> --- Notes: This module might be well suited to be moved to proxmox.git when it has stabilized enough. Changes since the RFC: - In NamespacedCache::new, use Into<PathBuf> instead of AsRef<Path> (thx @ Wolfgang) - Improve test coverage - Basic sanity checks for namespaces and keys, for instance prohibiting "../" - Fix path generation for cases where there is already a file extension in the key (before, it would have been replaced by .json) - add async interface and rename the old read/write calls to read_blocking and write_blocking. read/write now return a distinct wrapper type that wraps all blocking operations in a `spawn_blocking` Cargo.toml | 1 + debian/control | 1 + server/Cargo.toml | 4 + server/src/lib.rs | 1 + server/src/namespaced_cache.rs | 742 +++++++++++++++++++++++++++++++++ 5 files changed, 749 insertions(+) create mode 100644 server/src/namespaced_cache.rs diff --git a/Cargo.toml b/Cargo.toml index 9806a4f0..5cf05b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.1" serde_json = "1.0" serde_plain = "1" +tempfile = "3.15" thiserror = "1.0" tokio = "1.6" tracing = "0.1" diff --git a/debian/control b/debian/control index 61b9cb36..5955faa1 100644 --- a/debian/control +++ b/debian/control @@ -104,6 +104,7 @@ Build-Depends: debhelper-compat (= 13), librust-proxmox-syslog-api-1+default-dev, librust-proxmox-syslog-api-1+impl-dev, librust-proxmox-systemd-1+default-dev, + librust-tempfile-3+default-dev (>= 3.15.0), librust-proxmox-tfa-6+api-dev, librust-proxmox-tfa-6+api-types-dev, librust-proxmox-tfa-6+types-dev, diff --git a/server/Cargo.toml b/server/Cargo.toml index 3f185bbc..663f21cb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -29,6 +29,7 @@ openssl.workspace = true percent-encoding.workspace = true serde.workspace = true serde_json.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] } tracing.workspace = true url.workspace = true @@ -87,6 +88,9 @@ pdm-search.workspace = true pve-api-types = { workspace = true, features = [ "client" ] } pbs-api-types.workspace = true +[dev-dependencies] +tempfile.workspace = true + [lints.rust.unexpected_cfgs] level = "warn" check-cfg = ['cfg(remote_config, values("faked"))'] diff --git a/server/src/lib.rs b/server/src/lib.rs index 5ed10d69..0b7642ab 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,6 +7,7 @@ pub mod context; pub mod env; pub mod jobstate; pub mod metric_collection; +pub mod namespaced_cache; pub mod parallel_fetcher; pub mod remote_cache; pub mod remote_tasks; diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs new file mode 100644 index 00000000..9807fc54 --- /dev/null +++ b/server/src/namespaced_cache.rs @@ -0,0 +1,742 @@ +//! Generic namespaced cache implementation with optional value expiry. +//! +//! ``` +//! use std::time::Duration; +//! +//! use proxmox_sys::fs::CreateOptions; +//! use server::namespaced_cache::NamespacedCache; +//! +//! let dir = tempfile::tempdir().unwrap(); +//! let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new()); +//! let write_guard = cache.write_blocking("remote-a", Duration::from_secs(1)).unwrap(); +//! write_guard.set("val1", 1).unwrap(); +//! assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1); +//! +//! ``` + +use std::fs::File; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::task::JoinError; + +use proxmox_sys::fs::CreateOptions; + +/// Error type for [`NamespacedCache`]. +#[derive(thiserror::Error, Debug)] +pub enum CacheError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("error: {0}")] + Other(#[from] anyhow::Error), + + #[error("invalid key: '{0}'")] + InvalidKey(String), + + #[error("invalid namespace: '{0}'")] + InvalidNamespace(String), + + #[error("join error: {0}")] + JoinError(#[from] JoinError), +} + +/// A generic, namespaced cache with optional value expiration. +pub struct NamespacedCache { + base_directory: PathBuf, + file_options: CreateOptions, + dir_options: CreateOptions, +} + +impl NamespacedCache { + /// Create a new cache instance. + /// + /// Cache entries will be persisted in the provided `base_directory`. + /// `dir_options` are the [`CreateOptions`] used the namespace directories, while `file_options` + /// are the ones for persisted cache entries. + pub fn new<P: Into<PathBuf>>( + base_directory: P, + dir_options: CreateOptions, + file_options: CreateOptions, + ) -> Self { + Self { + base_directory: base_directory.into(), + dir_options, + file_options, + } + } + + /// Lock a namespace for writing (blocking interface). + /// + /// This should *not* be called from async code. Use [`NamespacedCache::write`] instead. + pub fn write_blocking( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<BlockingWritableCacheNamespace, CacheError> { + ensure_valid_namespace(namespace)?; + let lock = self.lock_namespace_impl_blocking(namespace, timeout, true)?; + + Ok(BlockingWritableCacheNamespace { + inner: WriteableInner { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_directory.clone(), + dir_options: self.dir_options, + file_options: self.file_options, + }, + }) + } + + /// Lock a namespace for writing (async interface). + pub async fn write( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<WritableCacheNamespace, CacheError> { + ensure_valid_namespace(namespace)?; + let lock = self.lock_namespace_impl(namespace, timeout, true).await?; + + Ok(WritableCacheNamespace { + inner: Arc::new(WriteableInner { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_directory.clone(), + dir_options: self.dir_options, + file_options: self.file_options, + }), + }) + } + + /// Lock a namespace for reading (blocking interface). + /// + /// This should *not* be called from async code. Use [`NamespacedCache::read`] instead. + pub fn read_blocking( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<BlockingReadableCacheNamespace, CacheError> { + ensure_valid_namespace(namespace)?; + let lock = self.lock_namespace_impl_blocking(namespace, timeout, false)?; + + Ok(BlockingReadableCacheNamespace { + inner: ReadableInner { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_directory.clone(), + }, + }) + } + + /// Lock a namespace for reading (async interface). + pub async fn read( + &self, + namespace: &str, + timeout: Duration, + ) -> Result<ReadableCacheNamespace, CacheError> { + ensure_valid_namespace(namespace)?; + let lock = self.lock_namespace_impl(namespace, timeout, false).await?; + + Ok(ReadableCacheNamespace { + inner: Arc::new(ReadableInner { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_directory.clone(), + }), + }) + } + + async fn lock_namespace_impl( + &self, + namespace: &str, + timeout: Duration, + exclusive: bool, + ) -> Result<File, CacheError> { + let path = get_lockfile(&self.base_directory, namespace); + let file_options = self.file_options; + let lock = tokio::task::spawn_blocking(move || { + proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, file_options) + }) + .await??; + + Ok(lock) + } + + fn lock_namespace_impl_blocking( + &self, + namespace: &str, + timeout: Duration, + exclusive: bool, + ) -> Result<File, CacheError> { + let path = get_lockfile(&self.base_directory, namespace); + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, self.file_options)?; + + Ok(lock) + } +} + +/// A readable cache namespace (blocking interface). +pub struct BlockingReadableCacheNamespace { + inner: ReadableInner, +} + +/// A readable cache namespace (async interface). +pub struct ReadableCacheNamespace { + inner: Arc<ReadableInner>, +} + +struct ReadableInner { + _lock: File, + namespace: String, + base_path: PathBuf, +} + +impl BlockingReadableCacheNamespace { + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { + get_impl(&self.inner.base_path, &self.inner.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + get_impl( + &self.inner.base_path, + &self.inner.namespace, + key, + Some(max_age), + ) + } +} + +impl ReadableCacheNamespace { + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + ) -> Result<Option<T>, CacheError> { + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + + tokio::task::spawn_blocking(move || { + get_impl(&inner.base_path, &inner.namespace, &key, None) + }) + .await? + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + + tokio::task::spawn_blocking(move || { + get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age)) + }) + .await? + } +} + +/// A writable cache namespace (blocking interface). +pub struct BlockingWritableCacheNamespace { + inner: WriteableInner, +} + +/// A writable cache namespace (async interface). +pub struct WritableCacheNamespace { + inner: Arc<WriteableInner>, +} + +struct WriteableInner { + _lock: File, + namespace: String, + base_path: PathBuf, + dir_options: CreateOptions, + file_options: CreateOptions, +} + +impl BlockingWritableCacheNamespace { + /// Remote a cache entry. + /// + /// This returns `Ok(())` if the key does not exist. + /// + /// # Errors: + /// - The file could not be deleted due to insufficient privileges. + pub fn remove(&self, key: &str) -> Result<(), CacheError> { + remove_impl(&self.inner, key) + } + + /// Set a cache entry. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set<T: Serialize + DeserializeOwned>( + &self, + key: &str, + value: T, + ) -> Result<(), CacheError> { + set_impl(&self.inner, key, value, proxmox_time::epoch_i64()) + } + + /// Set a cache entry with an explicitly provided timestamp. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set_with_timestamp<T: Serialize + DeserializeOwned>( + &self, + key: &str, + value: T, + timestamp: i64, + ) -> Result<(), CacheError> { + set_impl(&self.inner, key, value, timestamp) + } + + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> { + get_impl(&self.inner.base_path, &self.inner.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age<T: Serialize + DeserializeOwned>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + get_impl( + &self.inner.base_path, + &self.inner.namespace, + key, + Some(max_age), + ) + } +} + +impl WritableCacheNamespace { + /// Remote a cache entry. + /// + /// This returns `Ok(())` if the key does not exist. + /// + /// # Errors: + /// - The file could not be deleted due to insufficient privileges. + pub async fn remove(&self, key: &str) -> Result<(), CacheError> { + let inner = Arc::clone(&self.inner); + let key = key.to_string(); + + tokio::task::spawn_blocking(move || remove_impl(&inner, &key)).await? + } + + /// Set a cache entry. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub async fn set<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + value: T, + ) -> Result<(), CacheError> { + let inner = Arc::clone(&self.inner); + let key = key.to_string(); + + tokio::task::spawn_blocking(move || { + set_impl(&inner, &key, value, proxmox_time::epoch_i64()) + }) + .await? + } + + /// Set a cache entry with an explicitly provided timestamp. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub async fn set_with_timestamp<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + value: T, + timestamp: i64, + ) -> Result<(), CacheError> { + let inner = Arc::clone(&self.inner); + let key = key.to_string(); + tokio::task::spawn_blocking(move || set_impl(&inner, &key, value, timestamp)).await? + } + + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + ) -> Result<Option<T>, CacheError> { + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + + tokio::task::spawn_blocking(move || { + get_impl(&inner.base_path, &inner.namespace, &key, None) + }) + .await? + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>( + &self, + key: &str, + max_age: i64, + ) -> Result<Option<T>, CacheError> { + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + + tokio::task::spawn_blocking(move || { + get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age)) + }) + .await? + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct CacheEntry<T> { + timestamp: i64, + value: T, +} + +impl<T> CacheEntry<T> { + fn is_expired(&self, now: i64, max_age: i64) -> bool { + if max_age == 0 { + return true; + } + + let diff = now - self.timestamp; + diff >= max_age || diff < 0 + } +} + +fn get_impl<T: Serialize + DeserializeOwned>( + base: &Path, + namespace: &str, + key: &str, + max_age: Option<i64>, +) -> Result<Option<T>, CacheError> { + // Namespace should already be verified at this point, no point in checking it again. + ensure_valid_key(key)?; + + let path = get_path(base, namespace, key); + let content = proxmox_sys::fs::file_read_optional_string(path)?; + + if let Some(content) = content { + let val = serde_json::from_str::<CacheEntry<T>>(&content)?; + + if let Some(max_age) = max_age { + if val.is_expired(proxmox_time::epoch_i64(), max_age) { + return Ok(None); + } + } + return Ok(Some(val.value)); + } + + Ok(None) +} + +fn set_impl<T: Serialize + DeserializeOwned>( + inner: &WriteableInner, + key: &str, + value: T, + timestamp: i64, +) -> Result<(), CacheError> { + ensure_valid_key(key)?; + let path = get_path(&inner.base_path, &inner.namespace, key); + + proxmox_sys::fs::create_path( + path.parent().unwrap(), + Some(inner.dir_options), + Some(inner.dir_options), + )?; + + let entry = CacheEntry { timestamp, value }; + + let data = serde_json::to_vec(&entry)?; + proxmox_sys::fs::replace_file(path, &data, inner.file_options, true)?; + + Ok(()) +} + +fn remove_impl(inner: &WriteableInner, key: &str) -> Result<(), CacheError> { + ensure_valid_key(key)?; + let path = get_path(&inner.base_path, &inner.namespace, key); + + if let Err(err) = std::fs::remove_file(path) { + if err.kind() == ErrorKind::NotFound { + return Ok(()); + } + + return Err(err.into()); + } + + Ok(()) +} + +fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf { + let path = base.join(namespace).join(format!("{key}.json")); + path +} + +fn get_lockfile(base: &Path, namespace: &str) -> PathBuf { + let path = base.join(format!(".{namespace}.lock")); + path +} + +// Make sure that an identifier is safe to use as a cache key. +// +// At the moment, this only checks for '../', as to ensure that the base directory +// is not escaped. +fn ensure_valid_key(key: &str) -> Result<(), CacheError> { + if key.contains("../") { + return Err(CacheError::InvalidKey(key.into())); + } + + Ok(()) +} + +// Make sure that an identifier is safe to use as a namespace. +// +// At the moment, this ensures that there is no '../' in the namespace, as well as that the +// identifier does not start with '/'. +fn ensure_valid_namespace(namespace: &str) -> Result<(), CacheError> { + if namespace.contains("../") || namespace.starts_with("/") { + return Err(CacheError::InvalidNamespace(namespace.into())); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + + const TIMEOUT: Duration = Duration::from_secs(1); + + fn make_cache() -> (TempDir, NamespacedCache) { + let dir = tempfile::tempdir().unwrap(); + + let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new()); + + (dir, cache) + } + + #[test] + fn test_cache() { + let (_dir, cache) = make_cache(); + + let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap(); + write_guard.set("val1", 1).unwrap(); + write_guard.set("val2", 1).unwrap(); + + assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1); + + write_guard.remove("val1").unwrap(); + assert!(write_guard.get::<String>("val1").unwrap().is_none()); + + drop(write_guard); + + let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap(); + + assert_eq!(read_guard.get::<i32>("val2").unwrap().unwrap(), 1); + } + + #[test] + fn test_remove_nonexisting() { + let (_dir, cache) = make_cache(); + + let a = cache.write_blocking("remote-a", TIMEOUT).unwrap(); + + // Deleting a key that does not exist is okay and should not error. + assert!(a.remove("val").is_ok()); + } + + #[test] + fn test_remove_failure() { + let (dir, cache) = make_cache(); + + let a = cache.write_blocking("remote-a", TIMEOUT).unwrap(); + // Triggering a general failure by generating a directory that conflicts with the cache key + std::fs::create_dir_all(dir.path().join("remote-a").join("val.json")).unwrap(); + assert!(a.remove("val").is_err()); + } + + #[test] + fn test_get_with_max_age() { + let (_dir, cache) = make_cache(); + + let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap(); + + let now = proxmox_time::epoch_i64(); + + write_guard + .set_with_timestamp("somekey", 1, now - 1000) + .unwrap(); + + assert!(write_guard + .get_with_max_age::<i32>("somekey", 999) + .unwrap() + .is_none()); + + drop(write_guard); + let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap(); + assert!(read_guard + .get_with_max_age::<i32>("somekey", 999) + .unwrap() + .is_none()); + } + + #[test] + fn test_expiration() { + let entry = CacheEntry { + value: (), + timestamp: 1000, + }; + + assert!(!entry.is_expired(1000, 100)); + assert!(!entry.is_expired(1099, 100)); + assert!(entry.is_expired(1100, 100)); + assert!(entry.is_expired(1101, 100)); + + // if max-age is 0, the entry is never fresh + assert!(entry.is_expired(1000, 0)); + } + + #[test] + fn test_invalid_namespaces() { + let (_dir, cache) = make_cache(); + + for id in [ + "../remote-a", + "remote-a/../something", + "remote-a/../", + "../", + ] { + assert!(matches!( + cache.write_blocking(id, TIMEOUT), + Err(CacheError::InvalidNamespace(_)) + )); + assert!(matches!( + cache.read_blocking(id, TIMEOUT), + Err(CacheError::InvalidNamespace(_)) + )); + } + } + + #[test] + fn test_invalid_keys() { + let (_dir, cache) = make_cache(); + + let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap(); + let read_guard = cache.write_blocking("remote-b", TIMEOUT).unwrap(); + + for id in ["../somekey", "somekey/../something", "somekey/../", "../"] { + assert!(matches!( + write_guard.set(id, ()), + Err(CacheError::InvalidKey(_)) + )); + assert!(matches!( + write_guard.get::<()>(id), + Err(CacheError::InvalidKey(_)) + )); + assert!(matches!( + write_guard.get_with_max_age::<()>(id, 1000), + Err(CacheError::InvalidKey(_)) + )); + assert!(matches!( + write_guard.remove(id), + Err(CacheError::InvalidKey(_)) + )); + assert!(matches!( + read_guard.get::<()>(id), + Err(CacheError::InvalidKey(_)) + )); + assert!(matches!( + read_guard.get_with_max_age::<()>(id, 1000), + Err(CacheError::InvalidKey(_)) + )); + } + } + + #[tokio::test] + async fn test_async() { + let (_dir, cache) = make_cache(); + + let lock = cache.write("some-remote", TIMEOUT).await.unwrap(); + + lock.set("somekey", 1234).await.unwrap(); + assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234)); + lock.remove("somekey").await.unwrap(); + assert!(lock.get::<i32>("somekey").await.unwrap().is_none()); + + lock.set_with_timestamp("somekey", 1234, proxmox_time::epoch_i64() - 1000) + .await + .unwrap(); + + assert!(lock + .get_with_max_age::<i32>("somekey", 900) + .await + .unwrap() + .is_none()); + + drop(lock); + + let lock = cache.read("some-remote", TIMEOUT).await.unwrap(); + assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234)); + assert!(lock + .get_with_max_age::<i32>("somekey", 900) + .await + .unwrap() + .is_none()); + } +} -- 2.47.3 ^ permalink raw reply related [flat|nested] 13+ messages in thread
* Re: [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-13 13:54 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner @ 2026-05-15 9:06 ` Thomas Lamprecht 2026-05-15 9:19 ` Lukas Wagner 0 siblings, 1 reply; 13+ messages in thread From: Thomas Lamprecht @ 2026-05-15 9:06 UTC (permalink / raw) To: Lukas Wagner; +Cc: pdm-devel On Wed, 13 May 2026 15:54:54 +0200, Lukas Wagner wrote: > diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs > new file mode 100644 > --- /dev/null > +++ b/server/src/namespaced_cache.rs > @@ -0,0 +1,742 @@ [...] > +/// A generic, namespaced cache with optional value expiration. > +pub struct NamespacedCache { > + base_directory: PathBuf, > + file_options: CreateOptions, > + dir_options: CreateOptions, > +} > + > +impl NamespacedCache { > + /// Create a new cache instance. > + /// > + /// Cache entries will be persisted in the provided `base_directory`. > + /// `dir_options` are the [`CreateOptions`] used the namespace directories, while `file_options` nit: s/used the/used for the/ in the doc. > + /// are the ones for persisted cache entries. > + pub fn new<P: Into<PathBuf>>( > + base_directory: P, > + dir_options: CreateOptions, > + file_options: CreateOptions, > + ) -> Self { tiny nit: Struct and new constructr disagree on parameter order (file_options/dir_options vs. dir_options/file_options), but both are the same type. > +/// A writable cache namespace (blocking interface). > +pub struct BlockingWritableCacheNamespace { > + inner: WriteableInner, > +} > + > +/// A writable cache namespace (async interface). > +pub struct WritableCacheNamespace { > + inner: Arc<WriteableInner>, > +} > + > +struct WriteableInner { nit: public types are `Writable`, inner is `Writeable` - unify the spelling here (e.g. rename to `WritableInner`). [...] > +impl BlockingWritableCacheNamespace { > + /// Remote a cache entry. nit: s/Remote/Remove/ (and on `WritableCacheNamespace::remove`). [...] > +// Make sure that an identifier is safe to use as a cache key. > +// > +// At the moment, this only checks for '../', as to ensure that the base directory > +// is not escaped. > +fn ensure_valid_key(key: &str) -> Result<(), CacheError> { > + if key.contains("../") { > + return Err(CacheError::InvalidKey(key.into())); > + } > + > + Ok(()) > +} > + > +// Make sure that an identifier is safe to use as a namespace. > +// > +// At the moment, this ensures that there is no '../' in the namespace, as well as that the > +// identifier does not start with '/'. > +fn ensure_valid_namespace(namespace: &str) -> Result<(), CacheError> { > + if namespace.contains("../") || namespace.starts_with("/") { > + return Err(CacheError::InvalidNamespace(namespace.into())); > + } > + > + Ok(()) > +} Something that I missed for the RRD fix back then w.r.t. "ensure that the base directory is not escaped", but it's not enough to only reject the `../` substring, as with that a couple of inputs still slip through: - a key like `/etc/foo` passes; joining an absolute path replaces the prefix, so the write ends up at `/etc/foo.json` outside the cache. - a namespace of just `".."` passes too (no `../` substring, no leading `/`), and then `base.join("..")` walks one level up. No caller hits either today (keys are hardcoded and namespaces always go through `format_remote_namespace`), but allowing only a known-good character set (like the SAFE_ID regex) would match the stated intent more directly than blocking specific substrings. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation 2026-05-15 9:06 ` Thomas Lamprecht @ 2026-05-15 9:19 ` Lukas Wagner 0 siblings, 0 replies; 13+ messages in thread From: Lukas Wagner @ 2026-05-15 9:19 UTC (permalink / raw) To: Thomas Lamprecht, Lukas Wagner; +Cc: pdm-devel On Fri May 15, 2026 at 11:06 AM CEST, Thomas Lamprecht wrote: > On Wed, 13 May 2026 15:54:54 +0200, Lukas Wagner wrote: >> diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs >> new file mode 100644 >> --- /dev/null >> +++ b/server/src/namespaced_cache.rs >> @@ -0,0 +1,742 @@ > [...] >> +/// A generic, namespaced cache with optional value expiration. >> +pub struct NamespacedCache { >> + base_directory: PathBuf, >> + file_options: CreateOptions, >> + dir_options: CreateOptions, >> +} >> + >> +impl NamespacedCache { >> + /// Create a new cache instance. >> + /// >> + /// Cache entries will be persisted in the provided `base_directory`. >> + /// `dir_options` are the [`CreateOptions`] used the namespace directories, while `file_options` > > nit: s/used the/used for the/ in the doc. > Fixed for the next revision, thanks! >> + /// are the ones for persisted cache entries. >> + pub fn new<P: Into<PathBuf>>( >> + base_directory: P, >> + dir_options: CreateOptions, >> + file_options: CreateOptions, >> + ) -> Self { > > tiny nit: Struct and new constructr disagree on parameter order > (file_options/dir_options vs. dir_options/file_options), but both are > the same type. > Fixed for the next revision, thanks! >> +/// A writable cache namespace (blocking interface). >> +pub struct BlockingWritableCacheNamespace { >> + inner: WriteableInner, >> +} >> + >> +/// A writable cache namespace (async interface). >> +pub struct WritableCacheNamespace { >> + inner: Arc<WriteableInner>, >> +} >> + >> +struct WriteableInner { > > nit: public types are `Writable`, inner is `Writeable` - unify the > spelling here (e.g. rename to `WritableInner`). > Fixed for the next revision, thanks! > [...] >> +impl BlockingWritableCacheNamespace { >> + /// Remote a cache entry. > > nit: s/Remote/Remove/ (and on `WritableCacheNamespace::remove`). Fixed for the next revision, thanks! > > [...] >> +// Make sure that an identifier is safe to use as a cache key. >> +// >> +// At the moment, this only checks for '../', as to ensure that the base directory >> +// is not escaped. >> +fn ensure_valid_key(key: &str) -> Result<(), CacheError> { >> + if key.contains("../") { >> + return Err(CacheError::InvalidKey(key.into())); >> + } >> + >> + Ok(()) >> +} >> + >> +// Make sure that an identifier is safe to use as a namespace. >> +// >> +// At the moment, this ensures that there is no '../' in the namespace, as well as that the >> +// identifier does not start with '/'. >> +fn ensure_valid_namespace(namespace: &str) -> Result<(), CacheError> { >> + if namespace.contains("../") || namespace.starts_with("/") { >> + return Err(CacheError::InvalidNamespace(namespace.into())); >> + } >> + >> + Ok(()) >> +} > > Something that I missed for the RRD fix back then w.r.t. "ensure that > the base directory is not escaped", but it's not enough to only reject > the `../` substring, as with that a couple of inputs still slip through: > > - a key like `/etc/foo` passes; joining an absolute path replaces the > prefix, so the write ends up at `/etc/foo.json` outside the cache. > - a namespace of just `".."` passes too (no `../` substring, no leading > `/`), and then `base.join("..")` walks one level up. > > No caller hits either today (keys are hardcoded and namespaces always go > through `format_remote_namespace`), but allowing only a known-good > character set (like the SAFE_ID regex) would match the stated intent > more directly than blocking specific substrings. Ack, will switch to a regex-based verification in the next revision! Thanks for the feedback! ^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2026-05-15 9:20 UTC | newest] Thread overview: 13+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2026-05-08 15:03 [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner 2026-05-12 12:29 ` Wolfgang Bumiller 2026-05-13 8:45 ` Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 2/4] add pdm_cache cache as a specialized wrapper around the namespaced cache Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 3/4] api: resources: subscriptions: switch over to pdm_cache Lukas Wagner 2026-05-08 15:03 ` [PATCH datacenter-manager 4/4] remote-updates: " Lukas Wagner 2026-05-11 12:37 ` [RFC datacenter-manager 0/4] add generic, per-remote (and global) cache for remote API responses Dominik Csapak 2026-05-12 8:39 ` Lukas Wagner 2026-05-13 13:56 ` superseded: " Lukas Wagner -- strict thread matches above, loose matches on Subject: below -- 2026-05-13 13:54 [PATCH " Lukas Wagner 2026-05-13 13:54 ` [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Lukas Wagner 2026-05-15 9:06 ` Thomas Lamprecht 2026-05-15 9:19 ` Lukas Wagner
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.