From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9])
	by lore.proxmox.com (Postfix) with ESMTPS id 204A21FF17F
	for <inbox@lore.proxmox.com>; Mon, 19 May 2025 13:55:07 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 711939187;
	Mon, 19 May 2025 13:55:07 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Mon, 19 May 2025 13:46:33 +0200
Message-Id: <20250519114640.303640-33-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250519114640.303640-1-c.ebner@proxmox.com>
References: <20250519114640.303640-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.030 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [datastore.rs]
Subject: [pbs-devel] [RFC proxmox-backup 32/39] tools: lru cache: add
 removed callback for evicted nodes
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Add a callback function to be executed on evicted cache nodes. The
callback gets the key of the removed node, allowing to externally act
based on that value.

Since the callback might fail, extend the current LRU cache api to
return an error on insert, covering the error for the `removed`
callback.

Async lru cache, callsites and tests are adapted to include the
additional callback parameter accordingly.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/cached_chunk_reader.rs |  6 +++-
 pbs-datastore/src/datastore.rs           |  2 +-
 pbs-datastore/src/dynamic_index.rs       |  1 +
 pbs-tools/src/async_lru_cache.rs         | 23 +++++++++----
 pbs-tools/src/lru_cache.rs               | 42 +++++++++++++++---------
 5 files changed, 50 insertions(+), 24 deletions(-)

diff --git a/pbs-datastore/src/cached_chunk_reader.rs b/pbs-datastore/src/cached_chunk_reader.rs
index be7f2a1e2..95ac23a54 100644
--- a/pbs-datastore/src/cached_chunk_reader.rs
+++ b/pbs-datastore/src/cached_chunk_reader.rs
@@ -81,7 +81,11 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
                 let info = self.index.chunk_info(chunk.0).unwrap();
 
                 // will never be None, see AsyncChunkCacher
-                let data = self.cache.access(info.digest, &self.cacher).await?.unwrap();
+                let data = self
+                    .cache
+                    .access(info.digest, &self.cacher, |_| Ok(()))
+                    .await?
+                    .unwrap();
 
                 let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read);
                 let slice = &mut buf[read..(read + want_bytes)];
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index a15f82b5b..22ad566ca 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -1156,7 +1156,7 @@ impl DataStore {
             let digest = index.index_digest(pos).unwrap();
 
             // Avoid multiple expensive atime updates by utimensat
-            if chunk_lru_cache.insert(*digest, ()) {
+            if chunk_lru_cache.insert(*digest, (), |_| Ok(()))? {
                 continue;
             }
 
diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs
index 8e9cb1163..e9d28c7de 100644
--- a/pbs-datastore/src/dynamic_index.rs
+++ b/pbs-datastore/src/dynamic_index.rs
@@ -599,6 +599,7 @@ impl<S: ReadChunk> BufferedDynamicReader<S> {
                     store: &mut self.store,
                     index: &self.index,
                 },
+                |_| Ok(()),
             )?
             .ok_or_else(|| format_err!("chunk not found by cacher"))?;
 
diff --git a/pbs-tools/src/async_lru_cache.rs b/pbs-tools/src/async_lru_cache.rs
index c43b87717..141114933 100644
--- a/pbs-tools/src/async_lru_cache.rs
+++ b/pbs-tools/src/async_lru_cache.rs
@@ -42,7 +42,16 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V: Clone + Send + 'static> AsyncL
     /// Access an item either via the cache or by calling cacher.fetch. A return value of Ok(None)
     /// means the item requested has no representation, Err(_) means a call to fetch() failed,
     /// regardless of whether it was initiated by this call or a previous one.
-    pub async fn access(&self, key: K, cacher: &dyn AsyncCacher<K, V>) -> Result<Option<V>, Error> {
+    /// Calls the removed callback on  the evicted item, if any.
+    pub async fn access<F>(
+        &self,
+        key: K,
+        cacher: &dyn AsyncCacher<K, V>,
+        removed: F,
+    ) -> Result<Option<V>, Error>
+    where
+        F: Fn(K) -> Result<(), Error>,
+    {
         let (owner, result_fut) = {
             // check if already requested
             let mut maps = self.maps.lock().unwrap();
@@ -71,7 +80,7 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V: Clone + Send + 'static> AsyncL
             // this call was the one initiating the request, put into LRU and remove from map
             let mut maps = self.maps.lock().unwrap();
             if let Ok(Some(ref value)) = result {
-                maps.0.insert(key, value.clone());
+                maps.0.insert(key, value.clone(), removed)?;
             }
             maps.1.remove(&key);
         }
@@ -106,15 +115,15 @@ mod test {
             let cache: AsyncLruCache<i32, String> = AsyncLruCache::new(2);
 
             assert_eq!(
-                cache.access(10, &cacher).await.unwrap(),
+                cache.access(10, &cacher, |_| Ok(())).await.unwrap(),
                 Some("x10".to_string())
             );
             assert_eq!(
-                cache.access(20, &cacher).await.unwrap(),
+                cache.access(20, &cacher, |_| Ok(())).await.unwrap(),
                 Some("x20".to_string())
             );
             assert_eq!(
-                cache.access(30, &cacher).await.unwrap(),
+                cache.access(30, &cacher, |_| Ok(())).await.unwrap(),
                 Some("x30".to_string())
             );
 
@@ -123,14 +132,14 @@ mod test {
                 tokio::spawn(async move {
                     let cacher = TestAsyncCacher { prefix: "y" };
                     assert_eq!(
-                        c.access(40, &cacher).await.unwrap(),
+                        c.access(40, &cacher, |_| Ok(())).await.unwrap(),
                         Some("y40".to_string())
                     );
                 });
             }
 
             assert_eq!(
-                cache.access(20, &cacher).await.unwrap(),
+                cache.access(20, &cacher, |_| Ok(())).await.unwrap(),
                 Some("x20".to_string())
             );
         });
diff --git a/pbs-tools/src/lru_cache.rs b/pbs-tools/src/lru_cache.rs
index 9e0112647..53b84ec41 100644
--- a/pbs-tools/src/lru_cache.rs
+++ b/pbs-tools/src/lru_cache.rs
@@ -60,10 +60,10 @@ impl<K, V> CacheNode<K, V> {
 /// assert_eq!(cache.get_mut(1), None);
 /// assert_eq!(cache.len(), 0);
 ///
-/// cache.insert(1, 1);
-/// cache.insert(2, 2);
-/// cache.insert(3, 3);
-/// cache.insert(4, 4);
+/// cache.insert(1, 1, |_| Ok(()));
+/// cache.insert(2, 2, |_| Ok(()));
+/// cache.insert(3, 3, |_| Ok(()));
+/// cache.insert(4, 4, |_| Ok(()));
 /// assert_eq!(cache.len(), 3);
 ///
 /// assert_eq!(cache.get_mut(1), None);
@@ -77,9 +77,9 @@ impl<K, V> CacheNode<K, V> {
 /// assert_eq!(cache.len(), 0);
 /// assert_eq!(cache.get_mut(2), None);
 /// // access will fill in missing cache entry by fetching from LruCacher
-/// assert_eq!(cache.access(2, &mut LruCacher {}).unwrap(), Some(&mut 2));
+/// assert_eq!(cache.access(2, &mut LruCacher {}, |_| Ok(())).unwrap(), Some(&mut 2));
 ///
-/// cache.insert(1, 1);
+/// cache.insert(1, 1, |_| Ok(()));
 /// assert_eq!(cache.get_mut(1), Some(&mut 1));
 ///
 /// cache.clear();
@@ -133,7 +133,10 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
 
     /// Insert or update an entry identified by `key` with the given `value`.
     /// This entry is placed as the most recently used node at the head.
-    pub fn insert(&mut self, key: K, value: V) -> bool {
+    pub fn insert<F>(&mut self, key: K, value: V, removed: F) -> Result<bool, anyhow::Error>
+    where
+        F: Fn(K) -> Result<(), anyhow::Error>,
+    {
         match self.map.entry(key) {
             Entry::Occupied(mut o) => {
                 // Node present, update value
@@ -142,7 +145,7 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
                 let mut node = unsafe { Box::from_raw(node_ptr) };
                 node.value = value;
                 let _node_ptr = Box::into_raw(node);
-                true
+                Ok(true)
             }
             Entry::Vacant(v) => {
                 // Node not present, insert a new one
@@ -158,9 +161,11 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
                 // avoid borrow conflict. This means there are temporarily
                 // self.capacity + 1 cache nodes.
                 if self.map.len() > self.capacity {
-                    self.pop_tail();
+                    if let Some(removed_node) = self.pop_tail() {
+                        removed(removed_node)?;
+                    }
                 }
-                false
+                Ok(false)
             }
         }
     }
@@ -174,11 +179,12 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
     }
 
     /// Remove the least recently used node from the cache.
-    fn pop_tail(&mut self) {
+    fn pop_tail(&mut self) -> Option<K> {
         if let Some(old_tail) = self.list.pop_tail() {
             // Remove HashMap entry for old tail
-            self.map.remove(&old_tail.key);
+            return self.map.remove(&old_tail.key).map(|_| old_tail.key);
         }
+        None
     }
 
     /// Get a mutable reference to the value identified by `key`.
@@ -206,11 +212,15 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
     /// value.
     /// If fetch returns a value, it is inserted as the most recently used entry
     /// in the cache.
-    pub fn access<'a>(
+    pub fn access<'a, F>(
         &'a mut self,
         key: K,
         cacher: &mut dyn Cacher<K, V>,
-    ) -> Result<Option<&'a mut V>, anyhow::Error> {
+        removed: F,
+    ) -> Result<Option<&'a mut V>, anyhow::Error>
+    where
+        F: Fn(K) -> Result<(), anyhow::Error>,
+    {
         match self.map.entry(key) {
             Entry::Occupied(mut o) => {
                 // Cache hit, birng node to front of list
@@ -234,7 +244,9 @@ impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
                         // avoid borrow conflict. This means there are temporarily
                         // self.capacity + 1 cache nodes.
                         if self.map.len() > self.capacity {
-                            self.pop_tail();
+                            if let Some(removed_node) = self.pop_tail() {
+                                removed(removed_node)?;
+                            }
                         }
                     }
                 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel