From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 235B41FF17F
	for <inbox@lore.proxmox.com>; Mon, 19 May 2025 13:55:06 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 991868FE6;
	Mon, 19 May 2025 13:55:06 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Mon, 19 May 2025 13:46:35 +0200
Message-Id: <20250519114640.303640-35-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250519114640.303640-1-c.ebner@proxmox.com>
References: <20250519114640.303640-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.030 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to
 Validity was blocked. See
 https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more
 information.
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [datastore.rs, lib.rs]
Subject: [pbs-devel] [RFC proxmox-backup 34/39] datastore: add local
 datastore cache for network attached storages
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Use a local datastore as cache using LRU cache replacement policy for
operations on a datastore backed by a network, e.g. by an S3 object
store backend. The goal is to reduce number of requests to the
backend and thereby save costs (monetary as well as time).

The cacher allows to fetch cache items on cache misses via the access
method.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/datastore.rs                |  46 ++++++-
 pbs-datastore/src/lib.rs                      |   3 +
 .../src/local_datastore_lru_cache.rs          | 116 ++++++++++++++++++
 3 files changed, 164 insertions(+), 1 deletion(-)
 create mode 100644 pbs-datastore/src/local_datastore_lru_cache.rs

diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index 22ad566ca..19d4ca02c 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -37,8 +37,9 @@ use crate::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
 use crate::fixed_index::{FixedIndexReader, FixedIndexWriter};
 use crate::hierarchy::{ListGroups, ListGroupsType, ListNamespaces, ListNamespacesRecursive};
 use crate::index::IndexFile;
+use crate::local_datastore_lru_cache::S3Cacher;
 use crate::task_tracking::{self, update_active_operations};
-use crate::DataBlob;
+use crate::{DataBlob, LocalDatastoreLruCache};
 
 static DATASTORE_MAP: LazyLock<Mutex<HashMap<String, Arc<DataStoreImpl>>>> =
     LazyLock::new(|| Mutex::new(HashMap::new()));
@@ -129,6 +130,7 @@ pub struct DataStoreImpl {
     last_digest: Option<[u8; 32]>,
     sync_level: DatastoreFSyncLevel,
     backend_config: DatastoreBackendConfig,
+    lru_store_caching: Option<LocalDatastoreLruCache>,
 }
 
 impl DataStoreImpl {
@@ -144,6 +146,7 @@ impl DataStoreImpl {
             last_digest: None,
             sync_level: Default::default(),
             backend_config: Default::default(),
+            lru_store_caching: None,
         })
     }
 }
@@ -241,6 +244,37 @@ impl DataStore {
         Ok(backend_type)
     }
 
+    pub fn cache(&self) -> Option<&LocalDatastoreLruCache> {
+        self.inner.lru_store_caching.as_ref()
+    }
+
+    /// Check if the digest is present in the local datastore cache.
+    /// Always returns false if there is no cache configured for this datastore.
+    pub fn cache_contains(&self, digest: &[u8; 32]) -> bool {
+        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
+            return cache.contains(digest);
+        }
+        false
+    }
+
+    /// Insert digest as most recently used on in the cache.
+    /// Returns with success if there is no cache configured for this datastore.
+    pub fn cache_insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
+        if let Some(cache) = self.inner.lru_store_caching.as_ref() {
+            return cache.insert(digest, chunk);
+        }
+        Ok(())
+    }
+
+    pub fn cacher(&self) -> Result<Option<S3Cacher>, Error> {
+        self.backend().map(|backend| match backend {
+            DatastoreBackend::S3(s3_client) => {
+                Some(S3Cacher::new(s3_client, self.inner.chunk_store.clone()))
+            }
+            DatastoreBackend::Filesystem => None,
+        })
+    }
+
     pub fn lookup_datastore(
         name: &str,
         operation: Option<Operation>,
@@ -423,6 +457,15 @@ impl DataStore {
             None => Default::default(),
         };
 
+        const LOCAL_DATASTORE_CACHE_SIZE: usize = 10_000_000;
+        let lru_store_caching = if let DatastoreBackendConfig::S3(_) = backend_config {
+            let cache =
+                LocalDatastoreLruCache::new(LOCAL_DATASTORE_CACHE_SIZE, chunk_store.clone());
+            Some(cache)
+        } else {
+            None
+        };
+
         Ok(DataStoreImpl {
             chunk_store,
             gc_mutex: Mutex::new(()),
@@ -432,6 +475,7 @@ impl DataStore {
             last_digest,
             sync_level: tuning.sync_level.unwrap_or_default(),
             backend_config,
+            lru_store_caching,
         })
     }
 
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index e6f65575b..f1ad3d4c2 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -216,3 +216,6 @@ pub use snapshot_reader::SnapshotReader;
 
 mod local_chunk_reader;
 pub use local_chunk_reader::LocalChunkReader;
+
+mod local_datastore_lru_cache;
+pub use local_datastore_lru_cache::LocalDatastoreLruCache;
diff --git a/pbs-datastore/src/local_datastore_lru_cache.rs b/pbs-datastore/src/local_datastore_lru_cache.rs
new file mode 100644
index 000000000..c711c5208
--- /dev/null
+++ b/pbs-datastore/src/local_datastore_lru_cache.rs
@@ -0,0 +1,116 @@
+//! Use a local datastore as cache for operations on a datastore attached via
+//! a network layer (e.g. via the S3 backend).
+
+use std::future::Future;
+use std::sync::Arc;
+
+use anyhow::{bail, Error};
+use hyper::body::HttpBody;
+
+use pbs_s3_client::S3Client;
+use pbs_tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+
+use crate::ChunkStore;
+use crate::DataBlob;
+
+#[derive(Clone)]
+pub struct S3Cacher {
+    client: Arc<S3Client>,
+    store: Arc<ChunkStore>,
+}
+
+impl AsyncCacher<[u8; 32], ()> for S3Cacher {
+    fn fetch(
+        &self,
+        key: [u8; 32],
+    ) -> Box<dyn Future<Output = Result<Option<()>, Error>> + Send + 'static> {
+        let client = self.client.clone();
+        let store = self.store.clone();
+        Box::new(async move {
+            match client.get_object(key.into()).await? {
+                None => bail!("could not fetch object with key {}", hex::encode(key)),
+                Some(response) => {
+                    let bytes = response.content.collect().await?.to_bytes();
+                    let chunk = DataBlob::from_raw(bytes.to_vec())?;
+                    store.insert_chunk(&chunk, &key)?;
+                    Ok(Some(()))
+                }
+            }
+        })
+    }
+}
+
+impl S3Cacher {
+    pub fn new(client: Arc<S3Client>, store: Arc<ChunkStore>) -> Self {
+        Self { client, store }
+    }
+}
+
+/// LRU cache using local datastore for caching chunks
+///
+/// Uses a LRU cache, but without storing the values in-memory but rather
+/// on the filesystem
+pub struct LocalDatastoreLruCache {
+    cache: AsyncLruCache<[u8; 32], ()>,
+    store: Arc<ChunkStore>,
+}
+
+impl LocalDatastoreLruCache {
+    pub fn new(capacity: usize, store: Arc<ChunkStore>) -> Self {
+        Self {
+            cache: AsyncLruCache::new(capacity),
+            store,
+        }
+    }
+
+    /// Insert a new chunk into the local datastore cache.
+    ///
+    /// Fails if the chunk cannot be inserted successfully.
+    pub fn insert(&self, digest: &[u8; 32], chunk: &DataBlob) -> Result<(), Error> {
+        self.store.insert_chunk(chunk, digest)?;
+        self.cache.insert(*digest, (), |digest| {
+            let (path, _digest_str) = self.store.chunk_path(&digest);
+            // Truncate to free up space but keep the inode around, since that
+            // is used as marker for chunks in use by garbage collection.
+            nix::unistd::truncate(&path, 0).map_err(Error::from)
+        })
+    }
+
+    /// Remove a chunk from the local datastore cache.
+    ///
+    /// Fails if the chunk cannot be deleted successfully.
+    pub fn remove(&self, digest: &[u8; 32]) -> Result<(), Error> {
+        self.cache.remove(*digest);
+        let (path, _digest_str) = self.store.chunk_path(digest);
+        std::fs::remove_file(path).map_err(Error::from)
+    }
+
+    pub async fn access(
+        &self,
+        digest: &[u8; 32],
+        cacher: &mut S3Cacher,
+    ) -> Result<Option<DataBlob>, Error> {
+        if self
+            .cache
+            .access(*digest, cacher, |digest| {
+                let (path, _digest_str) = self.store.chunk_path(&digest);
+                // Truncate to free up space but keep the inode around, since that
+                // is used as marker for chunks in use by garbage collection.
+                nix::unistd::truncate(&path, 0).map_err(Error::from)
+            })
+            .await?
+            .is_some()
+        {
+            let (path, _digest_str) = self.store.chunk_path(digest);
+            let mut file = std::fs::File::open(&path)?;
+            let chunk = DataBlob::load_from_reader(&mut file)?;
+            Ok(Some(chunk))
+        } else {
+            Ok(None)
+        }
+    }
+
+    pub fn contains(&self, digest: &[u8; 32]) -> bool {
+        self.cache.contains(*digest)
+    }
+}
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel