public inbox for pdm-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Lukas Wagner <l.wagner@proxmox.com>
To: pdm-devel@lists.proxmox.com
Subject: [PATCH datacenter-manager v3 1/4] add persistent, generic, namespaced key-value cache implementation
Date: Fri, 15 May 2026 16:49:24 +0200	[thread overview]
Message-ID: <20260515144927.427114-2-l.wagner@proxmox.com> (raw)
In-Reply-To: <20260515144927.427114-1-l.wagner@proxmox.com>

Namespaces map to directories inside a base directory. Cache contents
are stored as a single JSON-file per key inside the namespace directory.

Value expiry is implemented via a max_age parameter when retrieving
values. Callers might have different requirements to the freshness of
entries, so this seemed a better fit than statically setting an expiry
time when *setting* the value.

The cache offers both a blocking as well as an async interface. Once we
move this implementation to a shared crate, we should probably use
feature flags to gate these.

Signed-off-by: Lukas Wagner <l.wagner@proxmox.com>
---

Notes:
    This module might be well suited to be moved to proxmox.git when it has
    stabilized enough.
    
    Changes since the RFC:
      - In NamespacedCache::new, use Into<PathBuf> instead of AsRef<Path>
        (thx @ Wolfgang)
      - Improve test coverage
      - Basic sanity checks for namespaces and keys, for instance
        prohibiting "../"
      - Fix path generation for cases where there is already a
        file extension in the key (before, it would have been replaced by
        .json)
      - add async interface and rename the old read/write calls to
        read_blocking and write_blocking.
        read/write now return a distinct wrapper type that wraps all
        blocking operations in a `spawn_blocking`
    
    Changes since v2:
      - Add set_if_newer{,with_timestamp} functions
        These ones will only set the new cache entry, the current time
        or the provided timestamp is higher than the timestamp
        stored in the cache.
        These functions return Ok(Some(existing_entry)) if
        the data already in the cache is more recent, so that
        the caller can directly use is instead for further processing
    
      - Verify keys and namespaces with a regex (SAFE_ID_REGEX)
      - Fixed some typos

 Cargo.toml                     |   1 +
 debian/control                 |   1 +
 server/Cargo.toml              |   4 +
 server/src/lib.rs              |   1 +
 server/src/namespaced_cache.rs | 923 +++++++++++++++++++++++++++++++++
 5 files changed, 930 insertions(+)
 create mode 100644 server/src/namespaced_cache.rs

diff --git a/Cargo.toml b/Cargo.toml
index 9806a4f0..5cf05b41 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] }
 serde_cbor = "0.11.1"
 serde_json = "1.0"
 serde_plain = "1"
+tempfile = "3.15"
 thiserror = "1.0"
 tokio = "1.6"
 tracing = "0.1"
diff --git a/debian/control b/debian/control
index 61b9cb36..5955faa1 100644
--- a/debian/control
+++ b/debian/control
@@ -104,6 +104,7 @@ Build-Depends: debhelper-compat (= 13),
                librust-proxmox-syslog-api-1+default-dev,
                librust-proxmox-syslog-api-1+impl-dev,
                librust-proxmox-systemd-1+default-dev,
+               librust-tempfile-3+default-dev (>= 3.15.0),
                librust-proxmox-tfa-6+api-dev,
                librust-proxmox-tfa-6+api-types-dev,
                librust-proxmox-tfa-6+types-dev,
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 3f185bbc..663f21cb 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -29,6 +29,7 @@ openssl.workspace = true
 percent-encoding.workspace = true
 serde.workspace = true
 serde_json.workspace = true
+thiserror.workspace = true
 tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] }
 tracing.workspace = true
 url.workspace = true
@@ -87,6 +88,9 @@ pdm-search.workspace = true
 pve-api-types = { workspace = true, features = [ "client" ] }
 pbs-api-types.workspace = true
 
+[dev-dependencies]
+tempfile.workspace = true
+
 [lints.rust.unexpected_cfgs]
 level = "warn"
 check-cfg = ['cfg(remote_config, values("faked"))']
diff --git a/server/src/lib.rs b/server/src/lib.rs
index 5ed10d69..0b7642ab 100644
--- a/server/src/lib.rs
+++ b/server/src/lib.rs
@@ -7,6 +7,7 @@ pub mod context;
 pub mod env;
 pub mod jobstate;
 pub mod metric_collection;
+pub mod namespaced_cache;
 pub mod parallel_fetcher;
 pub mod remote_cache;
 pub mod remote_tasks;
diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs
new file mode 100644
index 00000000..e8e79847
--- /dev/null
+++ b/server/src/namespaced_cache.rs
@@ -0,0 +1,923 @@
+//! Generic namespaced cache implementation with optional value expiry.
+//!
+//! ```
+//! use std::time::Duration;
+//!
+//! use proxmox_sys::fs::CreateOptions;
+//! use server::namespaced_cache::NamespacedCache;
+//!
+//! let dir = tempfile::tempdir().unwrap();
+//! let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new());
+//! let write_guard = cache.write_blocking("remote-a", Duration::from_secs(1)).unwrap();
+//! write_guard.set("val1", 1).unwrap();
+//! assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1);
+//!
+//! ```
+
+use std::fs::File;
+use std::io::ErrorKind;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::time::Duration;
+
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use tokio::task::JoinError;
+
+use proxmox_schema::api_types::SAFE_ID_REGEX;
+use proxmox_sys::fs::CreateOptions;
+
+/// Error type for [`NamespacedCache`].
+#[derive(thiserror::Error, Debug)]
+pub enum CacheError {
+    #[error("IO error: {0}")]
+    Io(#[from] std::io::Error),
+
+    #[error("serialization error: {0}")]
+    Serde(#[from] serde_json::Error),
+
+    #[error("error: {0}")]
+    Other(#[from] anyhow::Error),
+
+    #[error("invalid key: '{0}'")]
+    InvalidKey(String),
+
+    #[error("invalid namespace: '{0}'")]
+    InvalidNamespace(String),
+
+    #[error("join error: {0}")]
+    JoinError(#[from] JoinError),
+}
+
+/// A generic, namespaced cache with optional value expiration.
+pub struct NamespacedCache {
+    base_directory: PathBuf,
+    dir_options: CreateOptions,
+    file_options: CreateOptions,
+}
+
+impl NamespacedCache {
+    /// Create a new cache instance.
+    ///
+    /// Cache entries will be persisted in the provided `base_directory`.
+    /// `dir_options` are the [`CreateOptions`] used for the namespace directories, while `file_options`
+    /// are the ones for persisted cache entries.
+    pub fn new<P: Into<PathBuf>>(
+        base_directory: P,
+        dir_options: CreateOptions,
+        file_options: CreateOptions,
+    ) -> Self {
+        Self {
+            base_directory: base_directory.into(),
+            dir_options,
+            file_options,
+        }
+    }
+
+    /// Lock a namespace for writing (blocking interface).
+    ///
+    /// This should *not* be called from async code. Use [`NamespacedCache::write`] instead.
+    pub fn write_blocking(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+    ) -> Result<BlockingWritableCacheNamespace, CacheError> {
+        ensure_valid_namespace(namespace)?;
+        let lock = self.lock_namespace_impl_blocking(namespace, timeout, true)?;
+
+        Ok(BlockingWritableCacheNamespace {
+            inner: WritableInner {
+                _lock: lock,
+                namespace: namespace.to_string(),
+                base_path: self.base_directory.clone(),
+                dir_options: self.dir_options,
+                file_options: self.file_options,
+            },
+        })
+    }
+
+    /// Lock a namespace for writing (async interface).
+    pub async fn write(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+    ) -> Result<WritableCacheNamespace, CacheError> {
+        ensure_valid_namespace(namespace)?;
+        let lock = self.lock_namespace_impl(namespace, timeout, true).await?;
+
+        Ok(WritableCacheNamespace {
+            inner: Arc::new(WritableInner {
+                _lock: lock,
+                namespace: namespace.to_string(),
+                base_path: self.base_directory.clone(),
+                dir_options: self.dir_options,
+                file_options: self.file_options,
+            }),
+        })
+    }
+
+    /// Lock a namespace for reading (blocking interface).
+    ///
+    /// This should *not* be called from async code. Use [`NamespacedCache::read`] instead.
+    pub fn read_blocking(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+    ) -> Result<BlockingReadableCacheNamespace, CacheError> {
+        ensure_valid_namespace(namespace)?;
+        let lock = self.lock_namespace_impl_blocking(namespace, timeout, false)?;
+
+        Ok(BlockingReadableCacheNamespace {
+            inner: ReadableInner {
+                _lock: lock,
+                namespace: namespace.to_string(),
+                base_path: self.base_directory.clone(),
+            },
+        })
+    }
+
+    /// Lock a namespace for reading (async interface).
+    pub async fn read(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+    ) -> Result<ReadableCacheNamespace, CacheError> {
+        ensure_valid_namespace(namespace)?;
+        let lock = self.lock_namespace_impl(namespace, timeout, false).await?;
+
+        Ok(ReadableCacheNamespace {
+            inner: Arc::new(ReadableInner {
+                _lock: lock,
+                namespace: namespace.to_string(),
+                base_path: self.base_directory.clone(),
+            }),
+        })
+    }
+
+    async fn lock_namespace_impl(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+        exclusive: bool,
+    ) -> Result<File, CacheError> {
+        let path = get_lockfile(&self.base_directory, namespace);
+        let file_options = self.file_options;
+        let lock = tokio::task::spawn_blocking(move || {
+            proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, file_options)
+        })
+        .await??;
+
+        Ok(lock)
+    }
+
+    fn lock_namespace_impl_blocking(
+        &self,
+        namespace: &str,
+        timeout: Duration,
+        exclusive: bool,
+    ) -> Result<File, CacheError> {
+        let path = get_lockfile(&self.base_directory, namespace);
+        let lock = proxmox_sys::fs::open_file_locked(&path, timeout, exclusive, self.file_options)?;
+
+        Ok(lock)
+    }
+}
+
+/// A readable cache namespace (blocking interface).
+pub struct BlockingReadableCacheNamespace {
+    inner: ReadableInner,
+}
+
+/// A readable cache namespace (async interface).
+pub struct ReadableCacheNamespace {
+    inner: Arc<ReadableInner>,
+}
+
+struct ReadableInner {
+    _lock: File,
+    namespace: String,
+    base_path: PathBuf,
+}
+
+impl BlockingReadableCacheNamespace {
+    /// Read a value from the cache.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> {
+        get_impl(&self.inner.base_path, &self.inner.namespace, key, None)
+    }
+
+    /// Read a value from the cache, given a maximum age of the cache entry.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub fn get_with_max_age<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        max_age: i64,
+    ) -> Result<Option<T>, CacheError> {
+        get_impl(
+            &self.inner.base_path,
+            &self.inner.namespace,
+            key,
+            Some(max_age),
+        )
+    }
+}
+
+impl ReadableCacheNamespace {
+    /// Read a value from the cache.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+    ) -> Result<Option<T>, CacheError> {
+        let key = key.to_string();
+        let inner = Arc::clone(&self.inner);
+
+        tokio::task::spawn_blocking(move || {
+            get_impl(&inner.base_path, &inner.namespace, &key, None)
+        })
+        .await?
+    }
+
+    /// Read a value from the cache, given a maximum age of the cache entry.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        max_age: i64,
+    ) -> Result<Option<T>, CacheError> {
+        let key = key.to_string();
+        let inner = Arc::clone(&self.inner);
+
+        tokio::task::spawn_blocking(move || {
+            get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age))
+        })
+        .await?
+    }
+}
+
+/// A writable cache namespace (blocking interface).
+pub struct BlockingWritableCacheNamespace {
+    inner: WritableInner,
+}
+
+/// A writable cache namespace (async interface).
+pub struct WritableCacheNamespace {
+    inner: Arc<WritableInner>,
+}
+
+struct WritableInner {
+    _lock: File,
+    namespace: String,
+    base_path: PathBuf,
+    dir_options: CreateOptions,
+    file_options: CreateOptions,
+}
+
+impl BlockingWritableCacheNamespace {
+    /// Remove a cache entry.
+    ///
+    /// This returns `Ok(())` if the key does not exist.
+    ///
+    /// # Errors:
+    ///   - The file could not be deleted due to insufficient privileges.
+    pub fn remove(&self, key: &str) -> Result<(), CacheError> {
+        remove_impl(&self.inner, key)
+    }
+
+    /// Set a cache entry.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub fn set<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        value: T,
+    ) -> Result<(), CacheError> {
+        set_impl(&self.inner, key, value, proxmox_time::epoch_i64())
+    }
+
+    /// Set a cache entry, but only if the timestamp is more recent than the already existing
+    /// entry.
+    ///
+    /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+    /// If the entry does not exist yet, the entry is always set.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub fn set_if_newer<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        value: T,
+    ) -> Result<Option<T>, CacheError> {
+        set_if_newer_impl(&self.inner, key, value, proxmox_time::epoch_i64())
+    }
+
+    /// Set a cache entry with an explicitly provided timestamp.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub fn set_with_timestamp<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        value: T,
+        timestamp: i64,
+    ) -> Result<(), CacheError> {
+        set_impl(&self.inner, key, value, timestamp)
+    }
+
+    /// Set a cache entry with an explicitly provided timestamp, but only if the timestamp is more
+    /// recent than the already existing entry.
+    ///
+    /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+    /// If the entry does not exist yet, the entry is always set.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub fn set_if_newer_with_timestamp<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        value: T,
+        timestamp: i64,
+    ) -> Result<Option<T>, CacheError> {
+        set_if_newer_impl(&self.inner, key, value, timestamp)
+    }
+
+    /// Read a value from the cache.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub fn get<T: Serialize + DeserializeOwned>(&self, key: &str) -> Result<Option<T>, CacheError> {
+        get_impl(&self.inner.base_path, &self.inner.namespace, key, None)
+    }
+
+    /// Read a value from the cache, given a maximum age of the cache entry.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub fn get_with_max_age<T: Serialize + DeserializeOwned>(
+        &self,
+        key: &str,
+        max_age: i64,
+    ) -> Result<Option<T>, CacheError> {
+        get_impl(
+            &self.inner.base_path,
+            &self.inner.namespace,
+            key,
+            Some(max_age),
+        )
+    }
+}
+
+impl WritableCacheNamespace {
+    /// Remove a cache entry.
+    ///
+    /// This returns `Ok(())` if the key does not exist.
+    ///
+    /// # Errors:
+    ///   - The file could not be deleted due to insufficient privileges.
+    pub async fn remove(&self, key: &str) -> Result<(), CacheError> {
+        let inner = Arc::clone(&self.inner);
+        let key = key.to_string();
+
+        tokio::task::spawn_blocking(move || remove_impl(&inner, &key)).await?
+    }
+
+    /// Set a cache entry.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub async fn set<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        value: T,
+    ) -> Result<(), CacheError> {
+        let inner = Arc::clone(&self.inner);
+        let key = key.to_string();
+
+        tokio::task::spawn_blocking(move || {
+            set_impl(&inner, &key, value, proxmox_time::epoch_i64())
+        })
+        .await?
+    }
+
+    /// Set a cache entry, but only if the timestamp is more recent than the already existing
+    /// entry.
+    ///
+    /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+    /// If the entry does not exist yet, the entry is always set.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub async fn set_if_newer<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        value: T,
+    ) -> Result<Option<T>, CacheError> {
+        let inner = Arc::clone(&self.inner);
+        let key = key.to_string();
+
+        tokio::task::spawn_blocking(move || {
+            set_if_newer_impl(&inner, &key, value, proxmox_time::epoch_i64())
+        })
+        .await?
+    }
+
+    /// Set a cache entry with an explicitly provided timestamp.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub async fn set_with_timestamp<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        value: T,
+        timestamp: i64,
+    ) -> Result<(), CacheError> {
+        let inner = Arc::clone(&self.inner);
+        let key = key.to_string();
+        tokio::task::spawn_blocking(move || set_impl(&inner, &key, value, timestamp)).await?
+    }
+
+    /// Set a cache entry with an explicitly provided timestamp, but only if the timestamp is more
+    /// recent than the already existing entry.
+    ///
+    /// If the existing entry is newer, it will be returned as `Ok(Some(existing_entry))`.
+    /// If the entry does not exist yet, the entry is always set.
+    ///
+    /// # Errors
+    ///   - `value` could not be serialized
+    ///   - The namespace directory could not be created
+    ///   - The cache file could not be written to or atomically replaced
+    pub async fn set_if_newer_with_timestamp<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        value: T,
+        timestamp: i64,
+    ) -> Result<Option<T>, CacheError> {
+        let inner = Arc::clone(&self.inner);
+        let key = key.to_string();
+        tokio::task::spawn_blocking(move || set_if_newer_impl(&inner, &key, value, timestamp))
+            .await?
+    }
+
+    /// Read a value from the cache.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub async fn get<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+    ) -> Result<Option<T>, CacheError> {
+        let key = key.to_string();
+        let inner = Arc::clone(&self.inner);
+
+        tokio::task::spawn_blocking(move || {
+            get_impl(&inner.base_path, &inner.namespace, &key, None)
+        })
+        .await?
+    }
+
+    /// Read a value from the cache, given a maximum age of the cache entry.
+    ///
+    /// # Errors:
+    ///   - The file associated with this key could not be read
+    ///   - The file could not be deserialized (e.g. invalid format)
+    pub async fn get_with_max_age<T: Serialize + DeserializeOwned + Send + 'static>(
+        &self,
+        key: &str,
+        max_age: i64,
+    ) -> Result<Option<T>, CacheError> {
+        let key = key.to_string();
+        let inner = Arc::clone(&self.inner);
+
+        tokio::task::spawn_blocking(move || {
+            get_impl(&inner.base_path, &inner.namespace, &key, Some(max_age))
+        })
+        .await?
+    }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+struct CacheEntry<T> {
+    timestamp: i64,
+    value: T,
+}
+
+impl<T> CacheEntry<T> {
+    fn is_expired(&self, now: i64, max_age: i64) -> bool {
+        if max_age == 0 {
+            return true;
+        }
+
+        let diff = now - self.timestamp;
+        diff >= max_age || diff < 0
+    }
+}
+
+fn get_impl<T: Serialize + DeserializeOwned>(
+    base: &Path,
+    namespace: &str,
+    key: &str,
+    max_age: Option<i64>,
+) -> Result<Option<T>, CacheError> {
+    // Namespace should already be verified at this point, no point in checking it again.
+    ensure_valid_key(key)?;
+
+    let path = get_path(base, namespace, key);
+
+    Ok(get_from_path(&path, max_age)?.map(|a| a.value))
+}
+
+fn get_from_path<T: Serialize + DeserializeOwned>(
+    path: &Path,
+    max_age: Option<i64>,
+) -> Result<Option<CacheEntry<T>>, CacheError> {
+    let content = proxmox_sys::fs::file_read_optional_string(path)?;
+
+    if let Some(content) = content {
+        let val = serde_json::from_str::<CacheEntry<T>>(&content)?;
+
+        if let Some(max_age) = max_age {
+            if val.is_expired(proxmox_time::epoch_i64(), max_age) {
+                return Ok(None);
+            }
+        }
+        return Ok(Some(val));
+    }
+
+    Ok(None)
+}
+
+fn set_if_newer_impl<T: Serialize + DeserializeOwned>(
+    inner: &WritableInner,
+    key: &str,
+    value: T,
+    timestamp: i64,
+) -> Result<Option<T>, CacheError> {
+    ensure_valid_key(key)?;
+    let path = get_path(&inner.base_path, &inner.namespace, key);
+
+    match get_from_path(&path, None) {
+        Ok(Some(existing)) => {
+            if existing.timestamp > timestamp {
+                return Ok(Some(existing.value));
+            }
+        }
+        Ok(None) => {}
+        Err(CacheError::Serde(err)) => {
+            // Special case, only log deserialization errors, in that case we want to override
+            // the cache file anyways.
+            log::error!("could not deserialize existing cache file in set_if_newer, overwriting anyways: {err}");
+        }
+        Err(err) => {
+            // Any other error will be bubbled up
+            return Err(err);
+        }
+    }
+
+    proxmox_sys::fs::create_path(
+        path.parent().unwrap(),
+        Some(inner.dir_options),
+        Some(inner.dir_options),
+    )?;
+
+    let entry = CacheEntry { timestamp, value };
+
+    let data = serde_json::to_vec(&entry)?;
+    proxmox_sys::fs::replace_file(path, &data, inner.file_options, true)?;
+
+    Ok(None)
+}
+
+fn set_impl<T: Serialize + DeserializeOwned>(
+    inner: &WritableInner,
+    key: &str,
+    value: T,
+    timestamp: i64,
+) -> Result<(), CacheError> {
+    ensure_valid_key(key)?;
+    let path = get_path(&inner.base_path, &inner.namespace, key);
+
+    proxmox_sys::fs::create_path(
+        path.parent().unwrap(),
+        Some(inner.dir_options),
+        Some(inner.dir_options),
+    )?;
+
+    let entry = CacheEntry { timestamp, value };
+
+    let data = serde_json::to_vec(&entry)?;
+    proxmox_sys::fs::replace_file(path, &data, inner.file_options, true)?;
+
+    Ok(())
+}
+
+fn remove_impl(inner: &WritableInner, key: &str) -> Result<(), CacheError> {
+    ensure_valid_key(key)?;
+    let path = get_path(&inner.base_path, &inner.namespace, key);
+
+    if let Err(err) = std::fs::remove_file(path) {
+        if err.kind() == ErrorKind::NotFound {
+            return Ok(());
+        }
+
+        return Err(err.into());
+    }
+
+    Ok(())
+}
+
+fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf {
+    let path = base.join(namespace).join(format!("{key}.json"));
+    path
+}
+
+fn get_lockfile(base: &Path, namespace: &str) -> PathBuf {
+    let path = base.join(format!(".{namespace}.lock"));
+    path
+}
+
+/// Make sure that an identifier is safe to use as a cache key.
+fn ensure_valid_key(key: &str) -> Result<(), CacheError> {
+    if !SAFE_ID_REGEX.is_match(key) {
+        return Err(CacheError::InvalidKey(key.into()));
+    }
+
+    Ok(())
+}
+
+/// Make sure that an identifier is safe to use as a namespace.
+fn ensure_valid_namespace(namespace: &str) -> Result<(), CacheError> {
+    if !SAFE_ID_REGEX.is_match(namespace) {
+        return Err(CacheError::InvalidNamespace(namespace.into()));
+    }
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use tempfile::TempDir;
+
+    use super::*;
+
+    const TIMEOUT: Duration = Duration::from_secs(1);
+
+    fn make_cache() -> (TempDir, NamespacedCache) {
+        let dir = tempfile::tempdir().unwrap();
+
+        let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new());
+
+        (dir, cache)
+    }
+
+    #[test]
+    fn test_cache() {
+        let (_dir, cache) = make_cache();
+
+        let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+        write_guard.set("val1", 1).unwrap();
+        write_guard.set("val2", 1).unwrap();
+
+        assert_eq!(write_guard.get::<i32>("val1").unwrap().unwrap(), 1);
+
+        write_guard.remove("val1").unwrap();
+        assert!(write_guard.get::<String>("val1").unwrap().is_none());
+
+        drop(write_guard);
+
+        let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap();
+
+        assert_eq!(read_guard.get::<i32>("val2").unwrap().unwrap(), 1);
+    }
+
+    #[test]
+    fn test_remove_nonexisting() {
+        let (_dir, cache) = make_cache();
+
+        let a = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+        // Deleting a key that does not exist is okay and should not error.
+        assert!(a.remove("val").is_ok());
+    }
+
+    #[test]
+    fn test_remove_failure() {
+        let (dir, cache) = make_cache();
+
+        let a = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+        // Triggering a general failure by generating a directory that conflicts with the cache key
+        std::fs::create_dir_all(dir.path().join("remote-a").join("val.json")).unwrap();
+        assert!(a.remove("val").is_err());
+    }
+
+    #[test]
+    fn test_get_with_max_age() {
+        let (_dir, cache) = make_cache();
+
+        let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+        let now = proxmox_time::epoch_i64();
+
+        write_guard
+            .set_with_timestamp("somekey", 1, now - 1000)
+            .unwrap();
+
+        assert!(write_guard
+            .get_with_max_age::<i32>("somekey", 999)
+            .unwrap()
+            .is_none());
+
+        drop(write_guard);
+        let read_guard = cache.read_blocking("remote-a", TIMEOUT).unwrap();
+        assert!(read_guard
+            .get_with_max_age::<i32>("somekey", 999)
+            .unwrap()
+            .is_none());
+    }
+
+    #[test]
+    fn test_set_if_newer_with_timestamp() {
+        let (_dir, cache) = make_cache();
+
+        let guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+
+        let now = proxmox_time::epoch_i64() + 200;
+
+        assert!(guard
+            .set_if_newer_with_timestamp("somekey", 1, now)
+            .unwrap()
+            .is_none());
+
+        assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 1);
+        assert!(guard
+            .set_if_newer_with_timestamp("somekey", 2, now + 1)
+            .unwrap()
+            .is_none());
+        assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+        assert!(matches!(
+            guard
+                .set_if_newer_with_timestamp("somekey", 3, now)
+                .unwrap(),
+            Some(2)
+        ));
+        // This should still contain the old value.
+        assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+
+        assert!(matches!(guard.set_if_newer("somekey", 3).unwrap(), Some(2)));
+        // This should still contain the old value.
+        assert_eq!(guard.get::<i32>("somekey").unwrap().unwrap(), 2);
+    }
+
+    #[test]
+    fn test_expiration() {
+        let entry = CacheEntry {
+            value: (),
+            timestamp: 1000,
+        };
+
+        assert!(!entry.is_expired(1000, 100));
+        assert!(!entry.is_expired(1099, 100));
+        assert!(entry.is_expired(1100, 100));
+        assert!(entry.is_expired(1101, 100));
+
+        // if max-age is 0, the entry is never fresh
+        assert!(entry.is_expired(1000, 0));
+    }
+
+    #[test]
+    fn test_invalid_namespaces() {
+        let (_dir, cache) = make_cache();
+
+        for id in [
+            "../remote-a",
+            "remote-a/../something",
+            "remote-a/../",
+            "../",
+        ] {
+            assert!(matches!(
+                cache.write_blocking(id, TIMEOUT),
+                Err(CacheError::InvalidNamespace(_))
+            ));
+            assert!(matches!(
+                cache.read_blocking(id, TIMEOUT),
+                Err(CacheError::InvalidNamespace(_))
+            ));
+        }
+    }
+
+    #[test]
+    fn test_invalid_keys() {
+        let (_dir, cache) = make_cache();
+
+        let write_guard = cache.write_blocking("remote-a", TIMEOUT).unwrap();
+        let read_guard = cache.write_blocking("remote-b", TIMEOUT).unwrap();
+
+        for id in ["../somekey", "somekey/../something", "somekey/../", "../"] {
+            assert!(matches!(
+                write_guard.set(id, ()),
+                Err(CacheError::InvalidKey(_))
+            ));
+            assert!(matches!(
+                write_guard.get::<()>(id),
+                Err(CacheError::InvalidKey(_))
+            ));
+            assert!(matches!(
+                write_guard.get_with_max_age::<()>(id, 1000),
+                Err(CacheError::InvalidKey(_))
+            ));
+            assert!(matches!(
+                write_guard.remove(id),
+                Err(CacheError::InvalidKey(_))
+            ));
+            assert!(matches!(
+                read_guard.get::<()>(id),
+                Err(CacheError::InvalidKey(_))
+            ));
+            assert!(matches!(
+                read_guard.get_with_max_age::<()>(id, 1000),
+                Err(CacheError::InvalidKey(_))
+            ));
+        }
+    }
+
+    #[tokio::test]
+    async fn test_async() {
+        let (_dir, cache) = make_cache();
+
+        let lock = cache.write("some-remote", TIMEOUT).await.unwrap();
+
+        lock.set("somekey", 1234).await.unwrap();
+        assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+        lock.remove("somekey").await.unwrap();
+        assert!(lock.get::<i32>("somekey").await.unwrap().is_none());
+
+        let now = proxmox_time::epoch_i64() - 1000;
+
+        lock.set_with_timestamp("somekey", 1234, now).await.unwrap();
+
+        assert!(lock
+            .get_with_max_age::<i32>("somekey", 900)
+            .await
+            .unwrap()
+            .is_none());
+
+        drop(lock);
+
+        let lock = cache.read("some-remote", TIMEOUT).await.unwrap();
+        assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+        assert!(lock
+            .get_with_max_age::<i32>("somekey", 900)
+            .await
+            .unwrap()
+            .is_none());
+    }
+
+    #[tokio::test]
+    async fn test_async_set_if_newer() {
+        let (_dir, cache) = make_cache();
+
+        let lock = cache.write("some-remote", TIMEOUT).await.unwrap();
+
+        let now = proxmox_time::epoch_i64() + 1000;
+        lock.set_with_timestamp("somekey", 1234, now).await.unwrap();
+
+        // This one should not set the entry, the existing timestamp is more recent
+        lock.set_if_newer_with_timestamp("somekey", 1235, now - 1)
+            .await
+            .unwrap();
+
+        assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+
+        // this should not change the entry
+        lock.set_if_newer("otherkey", 1235).await.unwrap();
+        assert_eq!(lock.get::<i32>("somekey").await.unwrap(), Some(1234));
+    }
+}
-- 
2.47.3





  reply	other threads:[~2026-05-15 14:49 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-15 14:49 [PATCH datacenter-manager v3 0/4] add generic, per-remote (and global) cache for remote API responses Lukas Wagner
2026-05-15 14:49 ` Lukas Wagner [this message]
2026-05-15 14:49 ` [PATCH datacenter-manager v3 2/4] add api_cache as a specialized wrapper around the namespaced cache Lukas Wagner
2026-05-15 14:49 ` [PATCH datacenter-manager v3 3/4] api: resources: subscriptions: switch over to api_cache Lukas Wagner
2026-05-15 14:49 ` [PATCH datacenter-manager v3 4/4] remote-updates: switch over to new api_cache Lukas Wagner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260515144927.427114-2-l.wagner@proxmox.com \
    --to=l.wagner@proxmox.com \
    --cc=pdm-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal