* [PATCH datacenter-manager v3 1/4] add persistent, generic, namespaced key-value cache implementation
2026-05-15 14:49 [PATCH datacenter-manager v3 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner
@ 2026-05-15 14:49 ` Lukas Wagner
2026-05-15 14:49 ` [PATCH datacenter-manager v3 2/4] add api_cache as a specialized wrapper around the namespaced cache Lukas Wagner
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Lukas Wagner @ 2026-05-15 14:49 UTC (permalink / raw)
To: pdm-devel
Namespaces map to directories inside a base directory. Cache contents
are stored as a single JSON-file per key inside the namespace directory.
Value expiry is implemented via a max_age parameter when retrieving
values. Callers might have different requirements to the freshness of
entries, so this seemed a better fit than statically setting an expiry
time when *setting* the value.
The cache offers both a blocking as well as an async interface. Once we
move this implementation to a shared crate, we should probably use
feature flags to gate these.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
This module might be well suited to be moved to proxmox.git when it has
stabilized enough.
Changes since the RFC:
- In NamespacedCache::new, use Into<PathBuf> instead of AsRef<Path>
(thx @ Wolfgang)
- Improve test coverage
- Basic sanity checks for namespaces and keys, for instance
prohibiting "../"
- Fix path generation for cases where there is already a
file extension in the key (before, it would have been replaced by
.json)
- add async interface and rename the old read/write calls to
read_blocking and write_blocking.
read/write now return a distinct wrapper type that wraps all
blocking operations in a `spawn_blocking`
Changes since v2:
- Add set_if_newer{,with_timestamp} functions
These ones will only set the new cache entry, the current time
or the provided timestamp is higher than the timestamp
stored in the cache.
These functions return Ok(Some(existing_entry)) if
the data already in the cache is more recent, so that
the caller can directly use is instead for further processing
- Verify keys and namespaces with a regex (SAFE_ID_REGEX)
- Fixed some typos
Cargo.toml | 1 +
debian/control | 1 +
server/Cargo.toml | 4 +
server/src/lib.rs | 1 +
server/src/namespaced_cache.rs | 923 +++++++++++++++++++++++++++++++++
5 files changed, 930 insertions(+)
create mode 100644 server/src/namespaced_cache.rs
diff --git a/Cargo.toml b/Cargo.toml
index 9806a4f0..5cf05b41 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.1"
serde_json = "1.0"
serde_plain = "1"
+tempfile = "3.15"
thiserror = "1.0"
tokio = "1.6"
tracing = "0.1"
diff --git a/debian/control b/debian/control
index 61b9cb36..5955faa1 100644
--- a/debian/control
+++ b/debian/control
@@ -104,6 +104,7 @@ Build-Depends: debhelper-compat (= 13),
librust-proxmox-syslog-api-1+default-dev,
librust-proxmox-syslog-api-1+impl-dev,
librust-proxmox-systemd-1+default-dev,
+ librust-tempfile-3+default-dev (>= 3.15.0),
librust-proxmox-tfa-6+api-dev,
librust-proxmox-tfa-6+api-types-dev,
librust-proxmox-tfa-6+types-dev,
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 3f185bbc..663f21cb 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -29,6 +29,7 @@ openssl.workspace = true
percent-encoding.workspace = true
serde.workspace = true
serde_json.workspace = true
+thiserror.workspace = true
tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] }
tracing.workspace = true
url.workspace = true
@@ -87,6 +88,9 @@ pdm-search.workspace = true
pve-api-types = { workspace = true, features = [ "client" ] }
pbs-api-types.workspace = true
+[dev-dependencies]
+tempfile.workspace = true
+
[lints.rust.unexpected_cfgs]
level = "warn"
check-cfg = ['cfg(remote_config, values("faked"))']
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 5ed10d69..0b7642ab 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -7,6 +7,7 @@ pub mod context;
pub mod env;
pub mod jobstate;
pub mod metric_collection;
+pub mod namespaced_cache;
pub mod parallel_fetcher;
pub mod remote_cache;
pub mod remote_tasks;
diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs
new file mode 100644
index 00000000..e8e79847
--- /dev/null
+++ b/server/src/namespaced_cache.rs
@@ -0,0 +1,923 @@
+//! Generic namespaced cache implementation with optional value expiry.
+//!
+//! ```
+//! use std::time::Duration;
+//!
+//! use proxmox_sys::fs::CreateOptions;
+//! use server::namespaced_cache::NamespacedCache;
+//!
+//! let dir = tempfile::tempdir().unwrap();
+//! let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new());
+//! let write_guard = cache.write_blocking("remote-a", Duration::from_secs(1)).unwrap();
+//! write_guard.set("val1", 1).unwrap();
+//! assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1);
+//!
+//! ```
+
+use std::fs::File;
+use std::io::ErrorKind;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use tokio::task::JoinError;
+
+use proxmox_schema::api_types::SAFE_ID_REGEX;
+use proxmox_sys::fs::CreateOptions;
+
+/// Error type for [`NamespacedCache`].
+#[derive(thiserror::Error, Debug)]
+pub enum CacheError {
+ #[error("IO error: {0}")]
+ Io(#[from] std::io::Error),
+
+ #[error("serialization error: {0}")]
+ Serde(#[from] serde_json::Error),
+
+ #[error("error: {0}")]
+ Other(#[from] anyhow::Error),
+
+ #[error("invalid key: '{0}'")]
+ InvalidKey(String),
+
+ #[error("invalid namespace: '{0}'")]
+ InvalidNamespace(String),
+
+ #[error("join error: {0}")]
+ JoinError(#[from] JoinError),
+}
+
+/// A generic, namespaced cache with optional value expiration.
+pub struct NamespacedCache {
+ base_directory: PathBuf,
+ dir_options: CreateOptions,
+ file_options: CreateOptions,
+}
+
+impl NamespacedCache {
+ /// Create a new cache instance.
+ ///
+ /// Cache entries will be persisted in the provided `base_directory`.
+ /// `dir_options` are the [`CreateOptions`] used for the namespace directories, while `file_options`
+ /// are the ones for persisted cache entries.
+ pub fn new<P: Into<PathBuf>>(
+ base_directory: P,
+ dir_options: CreateOptions,
+ file_options: CreateOptions,
+ ) -> Self {
+ Self {
+ base_directory: base_directory.into(),
+ dir_options,
+ file_options,
+ }
+ }
+
+ /// Lock a namespace for writing (blocking interface).
+ ///
+ /// This should *not* be called from async code. Use [`NamespacedCache::write`] instead.
+ pub fn write_blocking(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ ) -> Result<BlockingWritableCacheNamespace, CacheError> {
+ ensure_valid_namespace(namespace)?;
+ let lock = self.lock_namespace_impl_blocking(namespace, timeout, true)?;
+
+ Ok(BlockingWritableCacheNamespace {
+ inner: WritableInner {
+ _lock: lock,
+ namespace: namespace.to_string(),
+ base_path: self.base_directory.clone(),
+ dir_options: self.dir_options,
+ file_options: self.file_options,
+ },
+ })
+ }
+
+ /// Lock a namespace for writing (async interface).
+ pub async fn write(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ ) -> Result<WritableCacheNamespace, CacheError> {
+ ensure_valid_namespace(namespace)?;
+ let lock = self.lock_namespace_impl(namespace, timeout, true).await?;
+
+ Ok(WritableCacheNamespace {
+ inner: Arc::new(WritableInner {
+ _lock: lock,
+ namespace: namespace.to_string(),
+ base_path: self.base_directory.clone(),
+ dir_options: self.dir_options,
+ file_options: self.file_options,
+ }),
+ })
+ }
+
+ /// Lock a namespace for reading (blocking interface).
+ ///
+ /// This should *not* be called from async code. Use [`NamespacedCache::read`] instead.
+ pub fn read_blocking(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ ) -> Result<BlockingReadableCacheNamespace, CacheError> {
+ ensure_valid_namespace(namespace)?;
+ let lock = self.lock_namespace_impl_blocking(namespace, timeout, false)?;
+
+ Ok(BlockingReadableCacheNamespace {
+ inner: ReadableInner {
+ _lock: lock,
+ namespace: namespace.to_string(),
+ base_path: self.base_directory.clone(),
+ },
+ })
+ }
+
+ /// Lock a namespace for reading (async interface).
+ pub async fn read(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ ) -> Result<ReadableCacheNamespace, CacheError> {
+ ensure_valid_namespace(namespace)?;
+ let lock = self.lock_namespace_impl(namespace, timeout, false).await?;
+
+ Ok(ReadableCacheNamespace {
+ inner: Arc::new(ReadableInner {
+ _lock: lock,
+ namespace: namespace.to_string(),
+ base_path: self.base_directory.clone(),
+ }),
+ })
+ }
+
+ async fn lock_namespace_impl(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ exclusive: bool,
+ ) -> Result<File, CacheError> {
+ let path = get_lockfile(&self.base_directory, namespace);
+ let file_options = self.file_options;
+ let lock = tokio::task::spawn_blocking(move || {
+ proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, file_options)
+ })
+ .await??;
+
+ Ok(lock)
+ }
+
+ fn lock_namespace_impl_blocking(
+ &self,
+ namespace: &str,
+ timeout: Duration,
+ exclusive: bool,
+ ) -> Result<File, CacheError> {
+ let path = get_lockfile(&self.base_directory, namespace);
+ let lock = proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, self.file_options)?;
+
+ Ok(lock)
+ }
+}
+
+/// A readable cache namespace (blocking interface).
+pub struct BlockingReadableCacheNamespace {
+ inner: ReadableInner,
+}
+
+/// A readable cache namespace (async interface).
+pub struct ReadableCacheNamespace {
+ inner: Arc<ReadableInner>,
+}
+
+struct ReadableInner {
+ _lock: File,
+ namespace: String,
+ base_path: PathBuf,
+}
+
+impl BlockingReadableCacheNamespace {
+ /// Read a value from the cache.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> {
+ get_impl(&self.inner.base_path, &self.inner.namespace, key, None)
+ }
+
+ /// Read a value from the cache, given a maximum age of the cache entry.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub fn get_with_max_age<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ max_age: i64,
+ ) -> Result<Option<T>, CacheError> {
+ get_impl(
+ &self.inner.base_path,
+ &self.inner.namespace,
+ key,
+ Some(max_age),
+ )
+ }
+}
+
+impl ReadableCacheNamespace {
+ /// Read a value from the cache.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ ) -> Result<Option<T>, CacheError> {
+ let key = key.to_string();
+ let inner = Arc::clone(&self.inner);
+
+ tokio::task::spawn_blocking(move || {
+ get_impl(&inner.base_path, &inner.namespace, &key, None)
+ })
+ .await?
+ }
+
+ /// Read a value from the cache, given a maximum age of the cache entry.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ max_age: i64,
+ ) -> Result<Option<T>, CacheError> {
+ let key = key.to_string();
+ let inner = Arc::clone(&self.inner);
+
+ tokio::task::spawn_blocking(move || {
+ get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age))
+ })
+ .await?
+ }
+}
+
+/// A writable cache namespace (blocking interface).
+pub struct BlockingWritableCacheNamespace {
+ inner: WritableInner,
+}
+
+/// A writable cache namespace (async interface).
+pub struct WritableCacheNamespace {
+ inner: Arc<WritableInner>,
+}
+
+struct WritableInner {
+ _lock: File,
+ namespace: String,
+ base_path: PathBuf,
+ dir_options: CreateOptions,
+ file_options: CreateOptions,
+}
+
+impl BlockingWritableCacheNamespace {
+ /// Remove a cache entry.
+ ///
+ /// This returns `Ok(())` if the key does not exist.
+ ///
+ /// # Errors:
+ /// - The file could not be deleted due to insufficient privileges.
+ pub fn remove(&self, key: &str) -> Result<(), CacheError> {
+ remove_impl(&self.inner, key)
+ }
+
+ /// Set a cache entry.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub fn set<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ value: T,
+ ) -> Result<(), CacheError> {
+ set_impl(&self.inner, key, value, proxmox_time::epoch_i64())
+ }
+
+ /// Set a cache entry, but only if the timestamp is more recent than the already existing
+ /// entry.
+ ///
+ /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+ /// If the entry does not exist yet, the entry is always set.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub fn set_if_newer<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ value: T,
+ ) -> Result<Option<T>, CacheError> {
+ set_if_newer_impl(&self.inner, key, value, proxmox_time::epoch_i64())
+ }
+
+ /// Set a cache entry with an explicitly provided timestamp.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub fn set_with_timestamp<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ value: T,
+ timestamp: i64,
+ ) -> Result<(), CacheError> {
+ set_impl(&self.inner, key, value, timestamp)
+ }
+
+ /// Set a cache entry with an explicitly provided timestamp, but only if the timestamp is more
+ /// recent than the already existing entry.
+ ///
+ /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+ /// If the entry does not exist yet, the entry is always set.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub fn set_if_newer_with_timestamp<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ value: T,
+ timestamp: i64,
+ ) -> Result<Option<T>, CacheError> {
+ set_if_newer_impl(&self.inner, key, value, timestamp)
+ }
+
+ /// Read a value from the cache.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> {
+ get_impl(&self.inner.base_path, &self.inner.namespace, key, None)
+ }
+
+ /// Read a value from the cache, given a maximum age of the cache entry.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub fn get_with_max_age<T: Serialize + DeserializeOwned>(
+ &self,
+ key: &str,
+ max_age: i64,
+ ) -> Result<Option<T>, CacheError> {
+ get_impl(
+ &self.inner.base_path,
+ &self.inner.namespace,
+ key,
+ Some(max_age),
+ )
+ }
+}
+
+impl WritableCacheNamespace {
+ /// Remove a cache entry.
+ ///
+ /// This returns `Ok(())` if the key does not exist.
+ ///
+ /// # Errors:
+ /// - The file could not be deleted due to insufficient privileges.
+ pub async fn remove(&self, key: &str) -> Result<(), CacheError> {
+ let inner = Arc::clone(&self.inner);
+ let key = key.to_string();
+
+ tokio::task::spawn_blocking(move || remove_impl(&inner, &key)).await?
+ }
+
+ /// Set a cache entry.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub async fn set<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ value: T,
+ ) -> Result<(), CacheError> {
+ let inner = Arc::clone(&self.inner);
+ let key = key.to_string();
+
+ tokio::task::spawn_blocking(move || {
+ set_impl(&inner, &key, value, proxmox_time::epoch_i64())
+ })
+ .await?
+ }
+
+ /// Set a cache entry, but only if the timestamp is more recent than the already existing
+ /// entry.
+ ///
+ /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+ /// If the entry does not exist yet, the entry is always set.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub async fn set_if_newer<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ value: T,
+ ) -> Result<Option<T>, CacheError> {
+ let inner = Arc::clone(&self.inner);
+ let key = key.to_string();
+
+ tokio::task::spawn_blocking(move || {
+ set_if_newer_impl(&inner, &key, value, proxmox_time::epoch_i64())
+ })
+ .await?
+ }
+
+ /// Set a cache entry with an explicitly provided timestamp.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub async fn set_with_timestamp<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ value: T,
+ timestamp: i64,
+ ) -> Result<(), CacheError> {
+ let inner = Arc::clone(&self.inner);
+ let key = key.to_string();
+ tokio::task::spawn_blocking(move || set_impl(&inner, &key, value, timestamp)).await?
+ }
+
+ /// Set a cache entry with an explicitly provided timestamp, but only if the timestamp is more
+ /// recent than the already existing entry.
+ ///
+ /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+ /// If the entry does not exist yet, the entry is always set.
+ ///
+ /// # Errors
+ /// - `value` could not be serialized
+ /// - The namespace directory could not be created
+ /// - The cache file could not be written to or atomically replaced
+ pub async fn set_if_newer_with_timestamp<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ value: T,
+ timestamp: i64,
+ ) -> Result<Option<T>, CacheError> {
+ let inner = Arc::clone(&self.inner);
+ let key = key.to_string();
+ tokio::task::spawn_blocking(move || set_if_newer_impl(&inner, &key, value, timestamp))
+ .await?
+ }
+
+ /// Read a value from the cache.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ ) -> Result<Option<T>, CacheError> {
+ let key = key.to_string();
+ let inner = Arc::clone(&self.inner);
+
+ tokio::task::spawn_blocking(move || {
+ get_impl(&inner.base_path, &inner.namespace, &key, None)
+ })
+ .await?
+ }
+
+ /// Read a value from the cache, given a maximum age of the cache entry.
+ ///
+ /// # Errors:
+ /// - The file associated with this key could not be read
+ /// - The file could not be deserialized (e.g. invalid format)
+ pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>(
+ &self,
+ key: &str,
+ max_age: i64,
+ ) -> Result<Option<T>, CacheError> {
+ let key = key.to_string();
+ let inner = Arc::clone(&self.inner);
+
+ tokio::task::spawn_blocking(move || {
+ get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age))
+ })
+ .await?
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct CacheEntry<T> {
+ timestamp: i64,
+ value: T,
+}
+
+impl<T> CacheEntry<T> {
+ fn is_expired(&self, now: i64, max_age: i64) -> bool {
+ if max_age == 0 {
+ return true;
+ }
+
+ let diff = now - self.timestamp;
+ diff >= max_age || diff < 0
+ }
+}
+
+fn get_impl<T: Serialize + DeserializeOwned>(
+ base: &Path,
+ namespace: &str,
+ key: &str,
+ max_age: Option<i64>,
+) -> Result<Option<T>, CacheError> {
+ // Namespace should already be verified at this point, no point in checking it again.
+ ensure_valid_key(key)?;
+
+ let path = get_path(base, namespace, key);
+
+ Ok(get_from_path(&path, max_age)?.map(|a| a.value))
+}
+
+fn get_from_path<T: Serialize + DeserializeOwned>(
+ path: &Path,
+ max_age: Option<i64>,
+) -> Result<Option<CacheEntry<T>>, CacheError> {
+ let content = proxmox_sys::fs::file_read_optional_string(path)?;
+
+ if let Some(content) = content {
+ let val = serde_json::from_str::<CacheEntry<T>>(&content)?;
+
+ if let Some(max_age) = max_age {
+ if val.is_expired(proxmox_time::epoch_i64(), max_age) {
+ return Ok(None);
+ }
+ }
+ return Ok(Some(val));
+ }
+
+ Ok(None)
+}
+
+fn set_if_newer_impl<T: Serialize + DeserializeOwned>(
+ inner: &WritableInner,
+ key: &str,
+ value: T,
+ timestamp: i64,
+) -> Result<Option<T>, CacheError> {
+ ensure_valid_key(key)?;
+ let path = get_path(&inner.base_path, &inner.namespace, key);
+
+ match get_from_path(&path, None) {
+ Ok(Some(existing)) => {
+ if existing.timestamp > timestamp {
+ return Ok(Some(existing.value));
+ }
+ }
+ Ok(None) => {}
+ Err(CacheError::Serde(err)) => {
+ // Special case, only log deserialization errors, in that case we want to override
+ // the cache file anyways.
+ log::error!("could not deserialize existing cache file in set_if_newer, overwriting anyways: {err}");
+ }
+ Err(err) => {
+ // Any other error will be bubbled up
+ return Err(err);
+ }
+ }
+
+ proxmox_sys::fs::create_path(
+ path.parent().unwrap(),
+ Some(inner.dir_options),
+ Some(inner.dir_options),
+ )?;
+
+ let entry = CacheEntry { timestamp, value };
+
+ let data = serde_json::to_vec(&entry)?;
+ proxmox_sys::fs::replace_file(path, &data, inner.file_options, true)?;
+
+ Ok(None)
+}
+
+fn set_impl<T: Serialize + DeserializeOwned>(
+ inner: &WritableInner,
+ key: &str,
+ value: T,
+ timestamp: i64,
+) -> Result<(), CacheError> {
+ ensure_valid_key(key)?;
+ let path = get_path(&inner.base_path, &inner.namespace, key);
+
+ proxmox_sys::fs::create_path(
+ path.parent().unwrap(),
+ Some(inner.dir_options),
+ Some(inner.dir_options),
+ )?;
+
+ let entry = CacheEntry { timestamp, value };
+
+ let data = serde_json::to_vec(&entry)?;
+ proxmox_sys::fs::replace_file(path, &data, inner.file_options, true)?;
+
+ Ok(())
+}
+
+fn remove_impl(inner: &WritableInner, key: &str) -> Result<(), CacheError> {
+ ensure_valid_key(key)?;
+ let path = get_path(&inner.base_path, &inner.namespace, key);
+
+ if let Err(err) = std::fs::remove_file(path) {
+ if err.kind() == ErrorKind::NotFound {
+ return Ok(());
+ }
+
+ return Err(err.into());
+ }
+
+ Ok(())
+}
+
+fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf {
+ let path = base.join(namespace).join(format!("{key}.json"));
+ path
+}
+
+fn get_lockfile(base: &Path, namespace: &str) -> PathBuf {
+ let path = base.join(format!(".{namespace}.lock"));
+ path
+}
+
+/// Make sure that an identifier is safe to use as a cache key.
+fn ensure_valid_key(key: &str) -> Result<(), CacheError> {
+ if !SAFE_ID_REGEX.is_match(key) {
+ return Err(CacheError::InvalidKey(key.into()));
+ }
+
+ Ok(())
+}
+
+/// Make sure that an identifier is safe to use as a namespace.
+fn ensure_valid_namespace(namespace: &str) -> Result<(), CacheError> {
+ if !SAFE_ID_REGEX.is_match(namespace) {
+ return Err(CacheError::InvalidNamespace(namespace.into()));
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use tempfile::TempDir;
+
+ use super::*;
+
+ const TIMEOUT: Duration = Duration::from_secs(1);
+
+ fn make_cache() -> (TempDir, NamespacedCache) {
+ let dir = tempfile::tempdir().unwrap();
+
+ let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new());
+
+ (dir, cache)
+ }
+
+ #[test]
+ fn test_cache() {
+ let (_dir, cache) = make_cache();
+
+ let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+ write_guard.set("val1", 1).unwrap();
+ write_guard.set("val2", 1).unwrap();
+
+ assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1);
+
+ write_guard.remove("val1").unwrap();
+ assert!(write_guard.get::<String>("val1").unwrap().is_none());
+
+ drop(write_guard);
+
+ let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap();
+
+ assert_eq!(read_guard.get::<i32>("val2").unwrap().unwrap(), 1);
+ }
+
+ #[test]
+ fn test_remove_nonexisting() {
+ let (_dir, cache) = make_cache();
+
+ let a = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+ // Deleting a key that does not exist is okay and should not error.
+ assert!(a.remove("val").is_ok());
+ }
+
+ #[test]
+ fn test_remove_failure() {
+ let (dir, cache) = make_cache();
+
+ let a = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+ // Triggering a general failure by generating a directory that conflicts with the cache key
+ std::fs::create_dir_all(dir.path().join("remote-a").join("val.json")).unwrap();
+ assert!(a.remove("val").is_err());
+ }
+
+ #[test]
+ fn test_get_with_max_age() {
+ let (_dir, cache) = make_cache();
+
+ let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+ let now = proxmox_time::epoch_i64();
+
+ write_guard
+ .set_with_timestamp("somekey", 1, now - 1000)
+ .unwrap();
+
+ assert!(write_guard
+ .get_with_max_age::<i32>("somekey", 999)
+ .unwrap()
+ .is_none());
+
+ drop(write_guard);
+ let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap();
+ assert!(read_guard
+ .get_with_max_age::<i32>("somekey", 999)
+ .unwrap()
+ .is_none());
+ }
+
+ #[test]
+ fn test_set_if_newer_with_timestamp() {
+ let (_dir, cache) = make_cache();
+
+ let guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+ let now = proxmox_time::epoch_i64() + 200;
+
+ assert!(guard
+ .set_if_newer_with_timestamp("somekey", 1, now)
+ .unwrap()
+ .is_none());
+
+ assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 1);
+ assert!(guard
+ .set_if_newer_with_timestamp("somekey", 2, now + 1)
+ .unwrap()
+ .is_none());
+ assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+ assert!(matches!(
+ guard
+ .set_if_newer_with_timestamp("somekey", 3, now)
+ .unwrap(),
+ Some(2)
+ ));
+ // This should still contain the old value.
+ assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+
+ assert!(matches!(guard.set_if_newer("somekey", 3).unwrap(), Some(2)));
+ // This should still contain the old value.
+ assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+ }
+
+ #[test]
+ fn test_expiration() {
+ let entry = CacheEntry {
+ value: (),
+ timestamp: 1000,
+ };
+
+ assert!(!entry.is_expired(1000, 100));
+ assert!(!entry.is_expired(1099, 100));
+ assert!(entry.is_expired(1100, 100));
+ assert!(entry.is_expired(1101, 100));
+
+ // if max-age is 0, the entry is never fresh
+ assert!(entry.is_expired(1000, 0));
+ }
+
+ #[test]
+ fn test_invalid_namespaces() {
+ let (_dir, cache) = make_cache();
+
+ for id in [
+ "../remote-a",
+ "remote-a/../something",
+ "remote-a/../",
+ "../",
+ ] {
+ assert!(matches!(
+ cache.write_blocking(id, TIMEOUT),
+ Err(CacheError::InvalidNamespace(_))
+ ));
+ assert!(matches!(
+ cache.read_blocking(id, TIMEOUT),
+ Err(CacheError::InvalidNamespace(_))
+ ));
+ }
+ }
+
+ #[test]
+ fn test_invalid_keys() {
+ let (_dir, cache) = make_cache();
+
+ let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+ let read_guard = cache.write_blocking("remote-b", TIMEOUT).unwrap();
+
+ for id in ["../somekey", "somekey/../something", "somekey/../", "../"] {
+ assert!(matches!(
+ write_guard.set(id, ()),
+ Err(CacheError::InvalidKey(_))
+ ));
+ assert!(matches!(
+ write_guard.get::<()>(id),
+ Err(CacheError::InvalidKey(_))
+ ));
+ assert!(matches!(
+ write_guard.get_with_max_age::<()>(id, 1000),
+ Err(CacheError::InvalidKey(_))
+ ));
+ assert!(matches!(
+ write_guard.remove(id),
+ Err(CacheError::InvalidKey(_))
+ ));
+ assert!(matches!(
+ read_guard.get::<()>(id),
+ Err(CacheError::InvalidKey(_))
+ ));
+ assert!(matches!(
+ read_guard.get_with_max_age::<()>(id, 1000),
+ Err(CacheError::InvalidKey(_))
+ ));
+ }
+ }
+
+ #[tokio::test]
+ async fn test_async() {
+ let (_dir, cache) = make_cache();
+
+ let lock = cache.write("some-remote", TIMEOUT).await.unwrap();
+
+ lock.set("somekey", 1234).await.unwrap();
+ assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+ lock.remove("somekey").await.unwrap();
+ assert!(lock.get::<i32>("somekey").await.unwrap().is_none());
+
+ let now = proxmox_time::epoch_i64() - 1000;
+
+ lock.set_with_timestamp("somekey", 1234, now).await.unwrap();
+
+ assert!(lock
+ .get_with_max_age::<i32>("somekey", 900)
+ .await
+ .unwrap()
+ .is_none());
+
+ drop(lock);
+
+ let lock = cache.read("some-remote", TIMEOUT).await.unwrap();
+ assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+ assert!(lock
+ .get_with_max_age::<i32>("somekey", 900)
+ .await
+ .unwrap()
+ .is_none());
+ }
+
+ #[tokio::test]
+ async fn test_async_set_if_newer() {
+ let (_dir, cache) = make_cache();
+
+ let lock = cache.write("some-remote", TIMEOUT).await.unwrap();
+
+ let now = proxmox_time::epoch_i64() + 1000;
+ lock.set_with_timestamp("somekey", 1234, now).await.unwrap();
+
+ // This one should not set the entry, the existing timestamp is more recent
+ lock.set_if_newer_with_timestamp("somekey", 1235, now - 1)
+ .await
+ .unwrap();
+
+ assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+
+ // this should not change the entry
+ lock.set_if_newer("otherkey", 1235).await.unwrap();
+ assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+ }
+}
--
2.47.3
^ permalink raw reply related [flat|nested] 5+ messages in thread* [PATCH datacenter-manager v3 4/4] remote-updates: switch over to new api_cache
2026-05-15 14:49 [PATCH datacenter-manager v3 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner
` (2 preceding siblings ...)
2026-05-15 14:49 ` [PATCH datacenter-manager v3 3/4] api: resources: subscriptions: switch over to api_cache Lukas Wagner
@ 2026-05-15 14:49 ` Lukas Wagner
3 siblings, 0 replies; 5+ messages in thread
From: Lukas Wagner @ 2026-05-15 14:49 UTC (permalink / raw)
To: pdm-devel
Use the new, centralized API cache for caching remote update summaries.
Add some cleanup logic to remove the old cachefile, which can be removed
at some point in the future.
Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---
Notes:
Changes since the RFC:
- use new async cache interface
- clean up old cachefile automatically
Changes since v1:
- Make cache self-healing in case an entry could not be read or
deserialized. This ensure resiliency when the cached data type
changes unexpectedly. In this case, we just log an error and
return a default, empty summary.
- Avoid some .clone() calls
- When requesting the list of updates for a single remote, update the
cache asynchronously
Changes since v2 (thx @ Thomas)
- Avoid race condition when updating update cache
- Don't silently swallow updates to the update cache if
it does not exist yet
server/src/api/pve/mod.rs | 4 +-
server/src/api/remotes/updates.rs | 6 +-
server/src/remote_updates.rs | 123 +++++++++++++++++++-----------
3 files changed, 83 insertions(+), 50 deletions(-)
diff --git a/server/src/api/pve/mod.rs b/server/src/api/pve/mod.rs
index 20892f38..649ab624 100644
--- a/server/src/api/pve/mod.rs
+++ b/server/src/api/pve/mod.rs
@@ -588,8 +588,8 @@ pub async fn get_options(remote: String) -> Result<serde_json::Value, Error> {
},
)]
/// Return the cached update information about a remote.
-pub fn get_updates(remote: String) -> Result<RemoteUpdateSummary, Error> {
- let update_summary = get_available_updates_for_remote(&remote)?;
+pub async fn get_updates(remote: String) -> Result<RemoteUpdateSummary, Error> {
+ let update_summary = get_available_updates_for_remote(&remote).await?;
Ok(update_summary)
}
diff --git a/server/src/api/remotes/updates.rs b/server/src/api/remotes/updates.rs
index 365ffc19..ea46ba0d 100644
--- a/server/src/api/remotes/updates.rs
+++ b/server/src/api/remotes/updates.rs
@@ -42,7 +42,7 @@ const SUBDIRS: SubdirMap = &sorted!([
returns: { type: UpdateSummary }
)]
/// Return available update summary for managed remote nodes.
-pub fn update_summary(rpcenv: &mut dyn RpcEnvironment) -> Result<UpdateSummary, Error> {
+pub async fn update_summary(rpcenv: &mut dyn RpcEnvironment) -> Result<UpdateSummary, Error> {
let auth_id = rpcenv
.get_auth_id()
.context("no authid available")?
@@ -53,7 +53,7 @@ pub fn update_summary(rpcenv: &mut dyn RpcEnvironment) -> Result<UpdateSummary,
http_bail!(FORBIDDEN, "user has no access to resources");
}
- let mut update_summary = remote_updates::get_available_updates_summary()?;
+ let mut update_summary = remote_updates::get_available_updates_summary().await?;
update_summary.remotes.retain(|remote_name, _| {
user_info
@@ -136,7 +136,7 @@ async fn apt_update_available(remote: String, node: String) -> Result<Vec<APTUpd
let (config, _digest) = pdm_config::remotes::config()?;
let remote = get_remote(&config, &remote)?;
- let updates = remote_updates::list_available_updates(remote.clone(), &node).await?;
+ let updates = remote_updates::list_available_updates(remote.clone(), node).await?;
Ok(updates)
}
diff --git a/server/src/remote_updates.rs b/server/src/remote_updates.rs
index 7aaacc46..80264a57 100644
--- a/server/src/remote_updates.rs
+++ b/server/src/remote_updates.rs
@@ -1,6 +1,3 @@
-use std::fs::File;
-use std::io::ErrorKind;
-
use anyhow::{bail, Error};
use serde::{Deserialize, Serialize};
@@ -12,12 +9,13 @@ use pdm_api_types::remote_updates::{
};
use pdm_api_types::remotes::{Remote, RemoteType};
use pdm_api_types::RemoteUpid;
-use pdm_buildcfg::PDM_CACHE_DIR_M;
-use crate::connection;
use crate::parallel_fetcher::ParallelFetcher;
+use crate::{api_cache, connection};
-pub const UPDATE_CACHE: &str = concat!(PDM_CACHE_DIR_M!(), "/remote-updates.json");
+const OLD_CACHEFILE: &str = concat!(pdm_buildcfg::PDM_CACHE_DIR_M!(), "/remote-updates.json");
+
+const UPDATE_SUMMARY_CACHE_KEY: &str = "remote-updates";
#[derive(Clone, Default, Debug, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
@@ -28,14 +26,14 @@ struct NodeUpdateInfo {
repository_status: ProductRepositoryStatus,
}
-impl From<NodeUpdateInfo> for NodeUpdateSummary {
- fn from(value: NodeUpdateInfo) -> Self {
+impl From<&NodeUpdateInfo> for NodeUpdateSummary {
+ fn from(value: &NodeUpdateInfo) -> Self {
Self {
number_of_updates: value.updates.len() as u32,
last_refresh: value.last_refresh,
status: NodeUpdateStatus::Success,
status_message: None,
- versions: value.versions,
+ versions: value.versions.clone(),
repository_status: value.repository_status,
}
}
@@ -44,11 +42,20 @@ impl From<NodeUpdateInfo> for NodeUpdateSummary {
/// Return a list of available updates for a given remote node.
pub async fn list_available_updates(
remote: Remote,
- node: &str,
+ node: String,
) -> Result<Vec<APTUpdateInfo>, Error> {
- let updates = fetch_available_updates((), remote.clone(), node.to_string()).await?;
+ let updates = fetch_available_updates((), remote.clone(), node.clone()).await?;
- update_cached_summary_for_node(remote, node.into(), updates.clone().into()).await?;
+ let summary = (&updates).into();
+
+ // Update cache entry asynchronously, no need to wait for it.
+ tokio::task::spawn({
+ async move {
+ if let Err(err) = update_cached_summary_for_node(remote, node, summary).await {
+ log::error!("could not update 'remote-updates' API cache entry: {err}");
+ }
+ }
+ });
Ok(updates.updates)
}
@@ -106,10 +113,10 @@ pub async fn get_changelog(remote: &Remote, node: &str, package: String) -> Resu
}
/// Get update summary for all managed remotes.
-pub fn get_available_updates_summary() -> Result<UpdateSummary, Error> {
+pub async fn get_available_updates_summary() -> Result<UpdateSummary, Error> {
let (config, _digest) = pdm_config::remotes::config()?;
- let cache_content = get_cached_summary_or_default()?;
+ let cache_content = get_cached_summary_or_default().await?;
let mut summary = UpdateSummary::default();
@@ -137,11 +144,11 @@ pub fn get_available_updates_summary() -> Result<UpdateSummary, Error> {
}
/// Return cached update information from specific remote
-pub fn get_available_updates_for_remote(remote: &str) -> Result<RemoteUpdateSummary, Error> {
+pub async fn get_available_updates_for_remote(remote: &str) -> Result<RemoteUpdateSummary, Error> {
let (config, _digest) = pdm_config::remotes::config()?;
if let Some(remote) = config.get(remote) {
- let cache_content = get_cached_summary_or_default()?;
+ let cache_content = get_cached_summary_or_default().await?;
Ok(cache_content
.remotes
.get(&remote.id)
@@ -156,22 +163,24 @@ pub fn get_available_updates_for_remote(remote: &str) -> Result<RemoteUpdateSumm
}
}
-fn get_cached_summary_or_default() -> Result<UpdateSummary, Error> {
- match File::open(UPDATE_CACHE) {
- Ok(file) => {
- let content = match serde_json::from_reader(file) {
- Ok(cache_content) => cache_content,
- Err(err) => {
- log::error!("failed to deserialize remote update cache: {err:#}");
- Default::default()
- }
- };
+/// Read the cached summary from the API cache, or return a default, empty summary.
+///
+/// Note: This does not return an error if the cache entry could not be read (e.g. due to
+/// a deserialization error), but also returns the default, empty summary.
+/// This ensure that he cache self-heals if an entry got corrupted for some reason.
+async fn get_cached_summary_or_default() -> Result<UpdateSummary, Error> {
+ let guard = api_cache::read_global().await?;
- Ok(content)
- }
- Err(err) if err.kind() == ErrorKind::NotFound => Ok(Default::default()),
- Err(err) => Err(err.into()),
- }
+ let summary = guard
+ .get::<UpdateSummary>(UPDATE_SUMMARY_CACHE_KEY)
+ .await
+ .inspect_err(|err| {
+ log::error!("could not read 'remote-updates' entry from API cache: {err}")
+ })
+ .unwrap_or_default()
+ .unwrap_or_default();
+
+ Ok(summary)
}
async fn update_cached_summary_for_node(
@@ -179,8 +188,12 @@ async fn update_cached_summary_for_node(
node: String,
node_data: NodeUpdateSummary,
) -> Result<(), Error> {
- let mut file = File::open(UPDATE_CACHE)?;
- let mut cache_content: UpdateSummary = serde_json::from_reader(&mut file)?;
+ let cache = api_cache::write_global().await?;
+ let mut cache_content = cache
+ .get::<UpdateSummary>(UPDATE_SUMMARY_CACHE_KEY)
+ .await?
+ .unwrap_or_default();
+
let remote_entry =
cache_content
.remotes
@@ -192,14 +205,7 @@ async fn update_cached_summary_for_node(
});
remote_entry.nodes.insert(node, node_data);
-
- let options = proxmox_product_config::default_create_options();
- proxmox_sys::fs::replace_file(
- UPDATE_CACHE,
- &serde_json::to_vec(&cache_content)?,
- options,
- true,
- )?;
+ cache.set(UPDATE_SUMMARY_CACHE_KEY, cache_content).await?;
Ok(())
}
@@ -212,7 +218,15 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
.do_for_all_remote_nodes(remotes.clone().into_iter(), fetch_available_updates)
.await;
- let mut content = get_cached_summary_or_default()?;
+ let cache = api_cache::write_global().await?;
+ let mut content = cache
+ .get::<UpdateSummary>(UPDATE_SUMMARY_CACHE_KEY)
+ .await
+ .inspect_err(|err| {
+ log::error!("could not read 'remote-updates' entry from API cache: {err}")
+ })
+ .unwrap_or_default()
+ .unwrap_or_default();
// Clean out any remotes that might have been removed from the remote config in the meanwhile.
content
@@ -245,7 +259,7 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
match node_response.data() {
Ok(update_info) => {
- entry.nodes.insert(node_name, update_info.clone().into());
+ entry.nodes.insert(node_name, update_info.into());
}
Err(err) => {
// Could not fetch updates from node
@@ -275,8 +289,27 @@ pub async fn refresh_update_summary_cache(remotes: Vec<Remote>) -> Result<(), Er
}
}
- let options = proxmox_product_config::default_create_options();
- proxmox_sys::fs::replace_file(UPDATE_CACHE, &serde_json::to_vec(&content)?, options, true)?;
+ cache.set(UPDATE_SUMMARY_CACHE_KEY, content).await?;
+
+ cleanup_old_cachefile().await?;
+
+ Ok(())
+}
+
+// FIXME: We can remove this pretty soon.
+async fn cleanup_old_cachefile() -> Result<(), Error> {
+ tokio::task::spawn_blocking(|| {
+ if let Err(err) = std::fs::remove_file(OLD_CACHEFILE) {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ log::error!(
+ "could not clean up old remote update cache file {OLD_CACHEFILE}: {err}"
+ );
+ }
+ } else {
+ log::info!("removed obsolete remote update cachefile {OLD_CACHEFILE}")
+ }
+ })
+ .await?;
Ok(())
}
--
2.47.3
^ permalink raw reply related [flat|nested] 5+ messages in thread