From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 7010F1FF14F for ; Fri, 08 May 2026 17:03:46 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B71711C1BB; Fri, 8 May 2026 17:03:43 +0200 (CEST) From: Lukas Wagner To: pdm-devel@lists.proxmox.com Subject: [PATCH datacenter-manager 1/4] add persistent, generic, namespaced key-value cache implementation Date: Fri, 8 May 2026 17:03:27 +0200 Message-ID: <20260508150330.363622-2-l.wagner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260508150330.363622-1-l.wagner@proxmox.com> References: <20260508150330.363622-1-l.wagner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1778252506964 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.054 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Message-ID-Hash: PY5DNGIXVOBQT5FFKOUQJGIH5522XNTS X-Message-ID-Hash: PY5DNGIXVOBQT5FFKOUQJGIH5522XNTS X-MailFrom: l.wagner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Datacenter Manager development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Namespaces map to directories inside a base directory. Cache contents are stored as a single JSON-file per key inside the namespace directory. Value expiry is implemented via a max_age parameter when retrieving values. Callers might have different requirements to the freshness of entries, so this seemed a better fit than statically setting an expiry time when *setting* the value. The current implementation is pretty naive about namespace names and key names; these must come from verified, path-safe identifiers, as they are used directly as components of the path of the resulting directories and JSON files. This should be fixed before using this implementation elsewhere. Signed-off-by: Lukas Wagner --- Notes: This module might be well suited to be moved to proxmox.git when it has stabilized enough. Kept here for now to ease development. Cargo.toml | 1 + server/Cargo.toml | 2 + server/src/lib.rs | 1 + server/src/namespaced_cache.rs | 324 +++++++++++++++++++++++++++++++++ 4 files changed, 328 insertions(+) create mode 100644 server/src/namespaced_cache.rs diff --git a/Cargo.toml b/Cargo.toml index 9806a4f0..5cf05b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,6 +125,7 @@ serde = { version = "1.0", features = ["derive"] } serde_cbor = "0.11.1" serde_json = "1.0" serde_plain = "1" +tempfile = "3.15" thiserror = "1.0" tokio = "1.6" tracing = "0.1" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3f185bbc..6d6dd583 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -29,6 +29,8 @@ openssl.workspace = true percent-encoding.workspace = true serde.workspace = true serde_json.workspace = true +tempfile.workspace = true +thiserror.workspace = true tokio = { workspace = true, features = [ "fs", "io-util", "io-std", "macros", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "time" ] } tracing.workspace = true url.workspace = true diff --git a/server/src/lib.rs b/server/src/lib.rs index 5ed10d69..0b7642ab 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,6 +7,7 @@ pub mod context; pub mod env; pub mod jobstate; pub mod metric_collection; +pub mod namespaced_cache; pub mod parallel_fetcher; pub mod remote_cache; pub mod remote_tasks; diff --git a/server/src/namespaced_cache.rs b/server/src/namespaced_cache.rs new file mode 100644 index 00000000..89ed5b7f --- /dev/null +++ b/server/src/namespaced_cache.rs @@ -0,0 +1,324 @@ +//! Generic namespaced cache implementation with optional value expiry. +//! +//! NOTE: +//! The current implementation is pretty naive about namespace names and key +//! names; these must come from verified, path-safe identifiers, as they are +//! used directly as components of the path of the resulting directories and +//! JSON files. This should be fixed before using this implementation +//! elsewhere. + +use std::fs::File; +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use proxmox_sys::fs::CreateOptions; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +#[derive(thiserror::Error, Debug)] +pub enum CacheError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("error: {0}")] + Other(#[from] anyhow::Error), +} + +#[derive(Serialize, Deserialize, Debug)] +struct CacheEntry { + timestamp: i64, + value: T, +} + +impl CacheEntry { + fn is_expired(&self, now: i64, max_age: i64) -> bool { + if max_age == 0 { + return true; + } + + let diff = now - self.timestamp; + diff >= max_age || diff < 0 + } +} + +pub struct NamespacedCache { + base_path: PathBuf, + file_options: CreateOptions, + dir_options: CreateOptions, +} + +impl NamespacedCache { + /// Create a new cache instance. + pub fn new>( + base_path: P, + dir_options: CreateOptions, + file_options: CreateOptions, + ) -> Self { + Self { + base_path: base_path.as_ref().into(), + dir_options, + file_options, + } + } + + /// Lock a namespace for writing. + pub fn write( + &self, + namespace: &str, + timeout: Duration, + ) -> Result { + let path = get_lockfile(&self.base_path, namespace); + + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, true, self.file_options)?; + + Ok(WritableCacheNamespace { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_path.clone(), + dir_options: self.dir_options, + file_options: self.file_options, + }) + } + + /// Lock a namespace for reading. + pub fn read( + &self, + namespace: &str, + timeout: Duration, + ) -> Result { + let path = get_lockfile(&self.base_path, namespace); + + let lock = proxmox_sys::fs::open_file_locked(&path, timeout, false, self.file_options)?; + + Ok(ReadableCacheNamespace { + _lock: lock, + namespace: namespace.to_string(), + base_path: self.base_path.clone(), + }) + } +} + +/// A readable cache namespace. +pub struct ReadableCacheNamespace { + _lock: File, + namespace: String, + base_path: PathBuf, +} + +impl ReadableCacheNamespace { + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get(&self, key: &str) -> Result, CacheError> { + get_impl(&self.base_path, &self.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age( + &self, + key: &str, + max_age: i64, + ) -> Result, CacheError> { + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) + } +} + +/// A writable cache namespace. +pub struct WritableCacheNamespace { + _lock: File, + namespace: String, + base_path: PathBuf, + dir_options: CreateOptions, + file_options: CreateOptions, +} + +impl WritableCacheNamespace { + /// Remote a cache entry. + /// + /// This returns `Ok(())` if the key does not exist. + /// + /// # Errors: + /// - The file could not be deleted due to insufficient privileges. + pub fn remove(&self, key: &str) -> Result<(), CacheError> { + let path = get_path(&self.base_path, &self.namespace, key); + + if let Err(err) = std::fs::remove_file(path) { + if err.kind() == ErrorKind::NotFound { + return Ok(()); + } + + return Err(err.into()); + } + + Ok(()) + } + + /// Set a cache entry. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set( + &self, + key: &str, + entry: &T, + ) -> Result<(), CacheError> { + self.set_with_timestamp(key, entry, proxmox_time::epoch_i64()) + } + + /// Set a cache entry with an explicitly provided timestamp. + /// + /// # Errors + /// - `value` could not be serialized + /// - The namespace directory could not be created + /// - The cache file could not be written to or atomically replaced + pub fn set_with_timestamp( + &self, + key: &str, + value: &T, + timestamp: i64, + ) -> Result<(), CacheError> { + let path = get_path(&self.base_path, &self.namespace, key); + + proxmox_sys::fs::create_path( + path.parent().unwrap(), + Some(self.dir_options), + Some(self.dir_options), + )?; + + let entry = CacheEntry { timestamp, value }; + + let data = serde_json::to_vec(&entry)?; + proxmox_sys::fs::replace_file(path, &data, self.file_options, true)?; + + Ok(()) + } + + /// Read a value from the cache. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get(&self, key: &str) -> Result, CacheError> { + get_impl(&self.base_path, &self.namespace, key, None) + } + + /// Read a value from the cache, given a maximum age of the cache entry. + /// + /// # Errors: + /// - The file associated with this key could not be read + /// - The file could not be deserialized (e.g. invalid format) + pub fn get_with_max_age( + &self, + key: &str, + max_age: i64, + ) -> Result, CacheError> { + get_impl(&self.base_path, &self.namespace, key, Some(max_age)) + } +} + +fn get_impl( + base: &Path, + namespace: &str, + key: &str, + max_age: Option, +) -> Result, CacheError> { + let path = get_path(base, namespace, key); + let content = proxmox_sys::fs::file_read_optional_string(path)?; + + if let Some(content) = content { + let val = serde_json::from_str::>(&content)?; + + if let Some(max_age) = max_age { + if val.is_expired(proxmox_time::epoch_i64(), max_age) { + return Ok(None); + } + } + return Ok(Some(val.value)); + } + + return Ok(None); +} + +fn get_path(base: &Path, namespace: &str, key: &str) -> PathBuf { + let mut path = base.join(namespace).join(key); + path.set_extension("json"); + path +} + +fn get_lockfile(base: &Path, namespace: &str) -> PathBuf { + let path = base.join(format!(".{namespace}.lock")); + path +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + + fn make_cache() -> (TempDir, NamespacedCache) { + let dir = tempfile::tempdir().unwrap(); + + let cache = NamespacedCache::new(dir.as_ref(), CreateOptions::new(), CreateOptions::new()); + + (dir, cache) + } + + #[test] + fn test_cache() { + let (_dir, cache) = make_cache(); + + let write_guard = cache.write("remote-a", Duration::from_secs(1)).unwrap(); + write_guard.set("val1", &1).unwrap(); + write_guard.set("val2", &1).unwrap(); + + assert_eq!(write_guard.get::("val1").unwrap().unwrap(), 1); + + write_guard.remove("val1").unwrap(); + assert!(write_guard.get::("val1").unwrap().is_none()); + + drop(write_guard); + + let read_guard = cache.read("remote-a", Duration::from_secs(1)).unwrap(); + + assert_eq!(read_guard.get::("val2").unwrap().unwrap(), 1); + } + + #[test] + fn test_delete_nonexisting() { + let (_dir, cache) = make_cache(); + + let a = cache.write("remote-a", Duration::from_secs(1)).unwrap(); + a.set("val", &1).unwrap(); + + // Deleting a key that does not exist is okay and should not error. + assert!(a.remove("val").is_ok()); + } + + #[test] + fn test_expiration() { + let entry = CacheEntry { + value: (), + timestamp: 1000, + }; + + assert!(!entry.is_expired(1000, 100)); + assert!(!entry.is_expired(1099, 100)); + assert!(entry.is_expired(1100, 100)); + assert!(entry.is_expired(1101, 100)); + + // if max-age is 0, the entry is never fresh + assert!(entry.is_expired(1000, 0)); + } +} -- 2.47.3