public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v11 15/46] datastore: local chunk reader: read chunks based on backend
Date: Tue, 22 Jul 2025 12:10:35 +0200	[thread overview]
Message-ID: <20250722101106.526438-20-c.ebner@proxmox.com> (raw)
In-Reply-To: <20250722101106.526438-1-c.ebner@proxmox.com>

Get and store the datastore's backend on local chunk reader
instantiantion and fetch chunks based on the variant from either the
filesystem or the s3 object store.

By storing the backend variant, the s3 client is instantiated only
once and reused until the local chunk reader instance is dropped.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Hannes Laimer <h.laimer@proxmox.com>
---
changes since version 10:
 - no changes

 pbs-datastore/Cargo.toml                |  1 +
 pbs-datastore/src/local_chunk_reader.rs | 43 +++++++++++++++++++------
 src/api2/admin/datastore.rs             | 12 ++++---
 src/server/pull.rs                      |  8 +++--
 src/server/push.rs                      |  8 +++--
 src/server/sync.rs                      | 22 +++++--------
 6 files changed, 63 insertions(+), 31 deletions(-)

diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
index 7e56dbd31..8ce930a94 100644
--- a/pbs-datastore/Cargo.toml
+++ b/pbs-datastore/Cargo.toml
@@ -13,6 +13,7 @@ crc32fast.workspace = true
 endian_trait.workspace = true
 futures.workspace = true
 hex = { workspace = true, features = [ "serde" ] }
+http-body-util.workspace = true
 hyper.workspace = true
 libc.workspace = true
 log.workspace = true
diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs
index 05a70c068..58a2fee8d 100644
--- a/pbs-datastore/src/local_chunk_reader.rs
+++ b/pbs-datastore/src/local_chunk_reader.rs
@@ -3,17 +3,21 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 use anyhow::{bail, Error};
+use http_body_util::BodyExt;
 
 use pbs_api_types::CryptMode;
 use pbs_tools::crypt_config::CryptConfig;
+use proxmox_s3_client::S3Client;
 
 use crate::data_blob::DataBlob;
+use crate::datastore::DatastoreBackend;
 use crate::read_chunk::{AsyncReadChunk, ReadChunk};
 use crate::DataStore;
 
 #[derive(Clone)]
 pub struct LocalChunkReader {
     store: Arc<DataStore>,
+    backend: DatastoreBackend,
     crypt_config: Option<Arc<CryptConfig>>,
     crypt_mode: CryptMode,
 }
@@ -23,12 +27,14 @@ impl LocalChunkReader {
         store: Arc<DataStore>,
         crypt_config: Option<Arc<CryptConfig>>,
         crypt_mode: CryptMode,
-    ) -> Self {
-        Self {
+    ) -> Result<Self, Error> {
+        let backend = store.backend()?;
+        Ok(Self {
             store,
+            backend,
             crypt_config,
             crypt_mode,
-        }
+        })
     }
 
     fn ensure_crypt_mode(&self, chunk_mode: CryptMode) -> Result<(), Error> {
@@ -47,10 +53,26 @@ impl LocalChunkReader {
     }
 }
 
+async fn fetch(s3_client: Arc<S3Client>, digest: &[u8; 32]) -> Result<DataBlob, Error> {
+    let object_key = crate::s3::object_key_from_digest(digest)?;
+    if let Some(response) = s3_client.get_object(object_key).await? {
+        let bytes = response.content.collect().await?.to_bytes();
+        DataBlob::from_raw(bytes.to_vec())
+    } else {
+        bail!("no object with digest {}", hex::encode(digest));
+    }
+}
+
 impl ReadChunk for LocalChunkReader {
     fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
-        let chunk = self.store.load_chunk(digest)?;
+        let chunk = match &self.backend {
+            DatastoreBackend::Filesystem => self.store.load_chunk(digest)?,
+            DatastoreBackend::S3(s3_client) => {
+                proxmox_async::runtime::block_on(fetch(Arc::clone(s3_client), digest))?
+            }
+        };
         self.ensure_crypt_mode(chunk.crypt_mode()?)?;
+
         Ok(chunk)
     }
 
@@ -69,11 +91,14 @@ impl AsyncReadChunk for LocalChunkReader {
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<DataBlob, Error>> + Send + 'a>> {
         Box::pin(async move {
-            let (path, _) = self.store.chunk_path(digest);
-
-            let raw_data = tokio::fs::read(&path).await?;
-
-            let chunk = DataBlob::load_from_reader(&mut &raw_data[..])?;
+            let chunk = match &self.backend {
+                DatastoreBackend::Filesystem => {
+                    let (path, _) = self.store.chunk_path(digest);
+                    let raw_data = tokio::fs::read(&path).await?;
+                    DataBlob::load_from_reader(&mut &raw_data[..])?
+                }
+                DatastoreBackend::S3(s3_client) => fetch(Arc::clone(s3_client), digest).await?,
+            };
             self.ensure_crypt_mode(chunk.crypt_mode()?)?;
 
             Ok(chunk)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index e6b8449d2..d742633cf 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -1520,7 +1520,8 @@ pub fn download_file_decoded(
                 let (csum, size) = index.compute_csum();
                 manifest.verify_file(&file_name, &csum, size)?;
 
-                let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+                let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+                    .context("creating local chunk reader failed")?;
                 let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
                 Body::wrap_stream(AsyncReaderStream::new(reader).map_err(move |err| {
                     eprintln!("error during streaming of '{:?}' - {}", path, err);
@@ -1535,7 +1536,8 @@ pub fn download_file_decoded(
                 let (csum, size) = index.compute_csum();
                 manifest.verify_file(&file_name, &csum, size)?;
 
-                let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+                let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+                    .context("creating local chunk reader failed")?;
                 let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
                 Body::wrap_stream(
                     AsyncReaderStream::with_buffer_size(reader, 4 * 1024 * 1024).map_err(
@@ -1739,7 +1741,8 @@ pub async fn catalog(
             let (csum, size) = index.compute_csum();
             manifest.verify_file(&file_name, &csum, size)?;
 
-            let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+            let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+                .context("creating local chunk reader failed")?;
             let reader = BufferedDynamicReader::new(index, chunk_reader);
 
             let mut catalog_reader = CatalogReader::new(reader);
@@ -1808,7 +1811,8 @@ fn get_local_pxar_reader(
     let (csum, size) = index.compute_csum();
     manifest.verify_file(pxar_name, &csum, size)?;
 
-    let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+    let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None)
+        .context("creating local chunk reader failed")?;
     let reader = BufferedDynamicReader::new(index, chunk_reader);
     let archive_size = reader.archive_size();
 
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 775ed0c59..a4402b720 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -306,7 +306,9 @@ async fn pull_single_archive<'a>(
                 info!("skipping chunk sync for same datastore");
             } else {
                 let stats = pull_index_chunks(
-                    reader.chunk_reader(archive_info.crypt_mode),
+                    reader
+                        .chunk_reader(archive_info.crypt_mode)
+                        .context("failed to get chunk reader")?,
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
@@ -326,7 +328,9 @@ async fn pull_single_archive<'a>(
                 info!("skipping chunk sync for same datastore");
             } else {
                 let stats = pull_index_chunks(
-                    reader.chunk_reader(archive_info.crypt_mode),
+                    reader
+                        .chunk_reader(archive_info.crypt_mode)
+                        .context("failed to get chunk reader")?,
                     snapshot.datastore().clone(),
                     index,
                     downloaded_chunks,
diff --git a/src/server/push.rs b/src/server/push.rs
index e71012ed8..528eed9ff 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -890,7 +890,9 @@ pub(crate) async fn push_snapshot(
                             .await;
                     }
                     let index = DynamicIndexReader::open(&path)?;
-                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+                    let chunk_reader = reader
+                        .chunk_reader(entry.chunk_crypt_mode())
+                        .context("failed to get chunk reader")?;
                     let sync_stats = push_index(
                         &archive_name,
                         index,
@@ -914,7 +916,9 @@ pub(crate) async fn push_snapshot(
                             .await;
                     }
                     let index = FixedIndexReader::open(&path)?;
-                    let chunk_reader = reader.chunk_reader(entry.chunk_crypt_mode());
+                    let chunk_reader = reader
+                        .chunk_reader(entry.chunk_crypt_mode())
+                        .context("failed to get chunk reader")?;
                     let size = index.index_bytes();
                     let sync_stats = push_index(
                         &archive_name,
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 09814ef0c..9238a8626 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -87,7 +87,7 @@ impl SyncStats {
 /// and checking whether chunk sync should be skipped.
 pub(crate) trait SyncSourceReader: Send + Sync {
     /// Returns a chunk reader with the specified encryption mode.
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error>;
 
     /// Asynchronously loads a file from the source into a local file.
     /// `filename` is the name of the file to load from the source.
@@ -113,13 +113,10 @@ pub(crate) struct LocalSourceReader {
 
 #[async_trait::async_trait]
 impl SyncSourceReader for RemoteSourceReader {
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
-        Arc::new(RemoteChunkReader::new(
-            self.backup_reader.clone(),
-            None,
-            crypt_mode,
-            HashMap::new(),
-        ))
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+        let chunk_reader =
+            RemoteChunkReader::new(self.backup_reader.clone(), None, crypt_mode, HashMap::new());
+        Ok(Arc::new(chunk_reader))
     }
 
     async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
@@ -190,12 +187,9 @@ impl SyncSourceReader for RemoteSourceReader {
 
 #[async_trait::async_trait]
 impl SyncSourceReader for LocalSourceReader {
-    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
-        Arc::new(LocalChunkReader::new(
-            self.datastore.clone(),
-            None,
-            crypt_mode,
-        ))
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+        let chunk_reader = LocalChunkReader::new(self.datastore.clone(), None, crypt_mode)?;
+        Ok(Arc::new(chunk_reader))
     }
 
     async fn load_file_into(&self, filename: &str, into: &Path) -> Result<Option<DataBlob>, Error> {
-- 
2.47.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-07-22 10:11 UTC|newest]

Thread overview: 54+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-07-22 10:10 [pbs-devel] [PATCH proxmox{, -backup} v11 00/50] fix #2943: S3 storage backend for datastores Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox v11 1/4] pbs-api-types: extend datastore config by backend config enum Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox v11 2/4] pbs-api-types: maintenance: add new maintenance mode S3 refresh Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox v11 3/4] s3 client: Add missing S3 object key max length check Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox v11 4/4] s3 client: merge secrets config with client config Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 01/46] datastore: add helpers for path/digest to s3 object key conversion Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 02/46] config: introduce s3 object store client configuration Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 03/46] api: config: implement endpoints to manipulate and list s3 configs Christian Ebner
2025-07-22 12:18   ` Lukas Wagner
2025-07-22 12:32     ` Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 04/46] api: datastore: check s3 backend bucket access on datastore create Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 05/46] api/cli: add endpoint and command to check s3 client connection Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 06/46] datastore: allow to get the backend for a datastore Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 07/46] api: backup: store datastore backend in runtime environment Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 08/46] api: backup: conditionally upload chunks to s3 object store backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 09/46] api: backup: conditionally upload blobs " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 10/46] api: backup: conditionally upload indices " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 11/46] api: backup: conditionally upload manifest " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 12/46] api: datastore: conditionally upload client log to s3 backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 13/46] sync: pull: conditionally upload content " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 14/46] api: reader: fetch chunks based on datastore backend Christian Ebner
2025-07-22 10:10 ` Christian Ebner [this message]
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 16/46] verify worker: add datastore backed to verify worker Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 17/46] verify: implement chunk verification for stores with s3 backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 18/46] datastore: create namespace marker in " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 19/46] datastore: create/delete protected marker file on s3 storage backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 20/46] datastore: prune groups/snapshots from s3 object store backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 21/46] datastore: get and set owner for s3 " Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 22/46] datastore: implement garbage collection for s3 backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 23/46] ui: add datastore type selector and reorganize component layout Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 24/46] ui: add s3 client edit window for configuration create/edit Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 25/46] ui: add s3 client view for configuration Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 26/46] ui: expose the s3 client view in the navigation tree Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 27/46] ui: add s3 client selector and bucket field for s3 backend setup Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 28/46] tools: lru cache: add removed callback for evicted cache nodes Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 29/46] tools: async lru cache: implement insert, remove and contains methods Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 30/46] datastore: add local datastore cache for network attached storages Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 31/46] api: backup: use local datastore cache on s3 backend chunk upload Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 32/46] api: reader: use local datastore cache on s3 backend chunk fetching Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 33/46] datastore: local chunk reader: get cached chunk from local cache store Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 34/46] backup writer: refactor parameters into backup writer options struct Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 35/46] api: backup: add no-cache flag to bypass local datastore cache Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 36/46] api/datastore: implement refresh endpoint for stores with s3 backend Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 37/46] cli: add dedicated subcommand for datastore s3 refresh Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 38/46] ui: render s3 refresh as valid maintenance type and task description Christian Ebner
2025-07-22 10:10 ` [pbs-devel] [PATCH proxmox-backup v11 39/46] ui: expose s3 refresh button for datastores backed by object store Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 40/46] datastore: conditionally upload atime marker chunk to s3 backend Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 41/46] bin: implement client subcommands for s3 configuration manipulation Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 42/46] bin: expose reuse-datastore flag for proxmox-backup-manager Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 43/46] datastore: mark store as in-use by setting marker on s3 backend Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 44/46] datastore: run s3-refresh when reusing a datastore with " Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 45/46] api/ui: add flag to allow overwriting in-use marker for " Christian Ebner
2025-07-22 10:11 ` [pbs-devel] [PATCH proxmox-backup v11 46/46] docs: Add section describing how to setup s3 backed datastore Christian Ebner
2025-07-22 20:25 ` [pbs-devel] applied: [PATCH proxmox{, -backup} v11 00/50] fix #2943: S3 storage backend for datastores Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250722101106.526438-20-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal