all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 06/17] datastore: refactor chunk insert based on backend
Date: Mon,  3 Nov 2025 12:31:09 +0100	[thread overview]
Message-ID: <20251103113120.239455-7-c.ebner@proxmox.com> (raw)
In-Reply-To: <20251103113120.239455-1-c.ebner@proxmox.com>

With the goal to move all the backend related implementation details
into the datastore's method and deduplicate code as much as possible.

The backend has to be passed in from the caller on chunk insert, as
this stores the s3-client which lives for the whole backup session
lifetime. Same is true for the pull sync job and the tape restore,
although for latter the s3 functionality is not exposed yet as still
incomplete.

To distinguish between inserts in datastors with cache, for cases
where chunks should bypass the cache, add a dedicated method for that.

This will facilitate the implementation of the per-chunk file locking
for cache and backend consistency on s3 backed stores.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/datastore.rs  | 85 ++++++++++++++++++++++++++++++++-
 src/api2/backup/upload_chunk.rs | 64 ++++---------------------
 src/api2/tape/restore.rs        |  6 ++-
 src/server/pull.rs              | 19 +-------
 4 files changed, 98 insertions(+), 76 deletions(-)

diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index ae85be76d..9008d8fc6 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
 
 use anyhow::{bail, format_err, Context, Error};
 use http_body_util::BodyExt;
+use hyper::body::Bytes;
 use nix::unistd::{unlinkat, UnlinkatFlags};
 use pbs_tools::lru_cache::LruCache;
 use tokio::io::AsyncWriteExt;
@@ -1855,8 +1856,88 @@ impl DataStore {
             .cond_touch_chunk(digest, assert_exists)
     }
 
-    pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
-        self.inner.chunk_store.insert_chunk(chunk, digest)
+    /// Inserts the chunk with given digest in the chunk store based on the given backend.
+    ///
+    /// The backend is passed along in order to reuse an active connection to the backend, created
+    /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+    /// This calls into async code, so callers must assure to never hold a Mutex lock.
+    pub fn insert_chunk(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        backend: &DatastoreBackend,
+    ) -> Result<(bool, u64), Error> {
+        match backend {
+            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+            DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, &s3_client),
+        }
+    }
+
+    /// Inserts the chunk with given digest in the chunk store based on the given backend, but
+    /// always bypasses the local datastore cache.
+    ///
+    /// The backend is passed along in order to reuse an active connection to the backend, created
+    /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+    /// This calls into async code, so callers must assure to never hold a Mutex lock.
+    ///
+    /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore
+    /// itself.
+    pub fn insert_chunk_no_cache(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        backend: &DatastoreBackend,
+    ) -> Result<(bool, u64), Error> {
+        match backend {
+            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+            DatastoreBackend::S3(s3_client) => {
+                let chunk_data: Bytes = chunk.raw_data().to_vec().into();
+                let chunk_size = chunk_data.len() as u64;
+                let object_key = crate::s3::object_key_from_digest(digest)?;
+                let is_duplicate = proxmox_async::runtime::block_on(
+                    s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+                )
+                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+                Ok((is_duplicate, chunk_size))
+            }
+        }
+    }
+
+    fn insert_chunk_cached(
+        &self,
+        chunk: &DataBlob,
+        digest: &[u8; 32],
+        s3_client: &Arc<S3Client>,
+    ) -> Result<(bool, u64), Error> {
+        let chunk_data = chunk.raw_data();
+        let chunk_size = chunk_data.len() as u64;
+
+        // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache
+        // or the chunk marker file exists on filesystem. The latter means the chunk has
+        // been uploaded in the past, but was evicted from the LRU cache since but was not
+        // cleaned up by garbage collection, so contained in the S3 object store.
+        if self.cache_contains(&digest) {
+            tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
+            return Ok((true, chunk_size));
+        }
+        if let Ok(true) = self.cond_touch_chunk(digest, false) {
+            tracing::info!(
+                "Skip upload of already encountered chunk {}",
+                hex::encode(digest),
+            );
+            return Ok((true, chunk_size));
+        }
+
+        tracing::info!("Upload new chunk {}", hex::encode(digest));
+        let object_key = crate::s3::object_key_from_digest(digest)?;
+        let chunk_data: Bytes = chunk_data.to_vec().into();
+        let is_duplicate = proxmox_async::runtime::block_on(
+            s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+        )
+        .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+        tracing::info!("Caching of chunk {}", hex::encode(digest));
+        self.cache_insert(&digest, &chunk)?;
+        Ok((is_duplicate, chunk_size))
     }
 
     pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 297f23b12..f4a50002d 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,7 +5,7 @@ use anyhow::{bail, format_err, Error};
 use futures::*;
 use hex::FromHex;
 use http_body_util::{BodyDataStream, BodyExt};
-use hyper::body::{Bytes, Incoming};
+use hyper::body::Incoming;
 use hyper::http::request::Parts;
 use serde_json::{json, Value};
 
@@ -15,7 +15,7 @@ use proxmox_sortable_macro::sortable;
 
 use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
 use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
-use pbs_datastore::{DataBlob, DatastoreBackend};
+use pbs_datastore::DataBlob;
 use pbs_tools::json::{required_integer_param, required_string_param};
 
 use super::environment::*;
@@ -232,59 +232,15 @@ async fn upload_to_backend(
     let (digest, size, chunk) =
         UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?;
 
-    match &env.backend {
-        DatastoreBackend::Filesystem => {
-            let (is_duplicate, compressed_size) = proxmox_async::runtime::block_in_place(|| {
-                env.datastore.insert_chunk(&chunk, &digest)
-            })?;
-            Ok((digest, size, compressed_size as u32, is_duplicate))
-        }
-        DatastoreBackend::S3(s3_client) => {
-            let chunk_data: Bytes = chunk.raw_data().to_vec().into();
-
-            if env.no_cache {
-                let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-                let is_duplicate = s3_client
-                    .upload_no_replace_with_retry(object_key, chunk_data)
-                    .await
-                    .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
-                return Ok((digest, size, encoded_size, is_duplicate));
-            }
-
-            // Avoid re-upload to S3 if the chunk is either present in the LRU cache or the chunk
-            // file exists on filesystem. The latter means that the chunk has been present in the
-            // past an was not cleaned up by garbage collection, so contained in the S3 object store.
-            if env.datastore.cache_contains(&digest) {
-                tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
-                return Ok((digest, size, encoded_size, true));
-            }
-            if let Ok(true) = env.datastore.cond_touch_chunk(&digest, false) {
-                tracing::info!(
-                    "Skip upload of already encountered chunk {}",
-                    hex::encode(digest)
-                );
-                return Ok((digest, size, encoded_size, true));
-            }
-
-            tracing::info!("Upload of new chunk {}", hex::encode(digest));
-            let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-            let is_duplicate = s3_client
-                .upload_no_replace_with_retry(object_key, chunk_data)
-                .await
-                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
-
-            // Only insert the chunk into the cache after it has been successufuly uploaded.
-            // Although less performant than doing this in parallel, it is required for consisency
-            // since chunks are considered as present on the backend if the file exists in the local
-            // cache store.
-            let datastore = env.datastore.clone();
-            tracing::info!("Caching of chunk {}", hex::encode(digest));
-            let _ = tokio::task::spawn_blocking(move || datastore.cache_insert(&digest, &chunk))
-                .await?;
-
-            Ok((digest, size, encoded_size, is_duplicate))
-        }
+    if env.no_cache {
+        let (is_duplicate, chunk_size) =
+            env.datastore
+                .insert_chunk_no_cache(&chunk, &digest, &env.backend)?;
+        return Ok((digest, size, chunk_size as u32, is_duplicate));
     }
+
+    let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
+    Ok((digest, size, chunk_size as u32, is_duplicate))
 }
 
 pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 8b6979017..4f2ee3db6 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1151,6 +1151,7 @@ fn restore_partial_chunk_archive<'a>(
     let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
     let bytes2 = bytes.clone();
 
+    let backend = datastore.backend()?;
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1162,7 +1163,7 @@ fn restore_partial_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest)?;
+                datastore.insert_chunk(&chunk, &digest, &backend)?;
             }
             Ok(())
         },
@@ -1544,6 +1545,7 @@ fn restore_chunk_archive<'a>(
     let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
     let bytes2 = bytes.clone();
 
+    let backend = datastore.backend()?;
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1560,7 +1562,7 @@ fn restore_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest)?;
+                datastore.insert_chunk(&chunk, &digest, &backend)?;
             } else if verbose {
                 info!("Found existing chunk: {}", hex::encode(digest));
             }
diff --git a/src/server/pull.rs b/src/server/pull.rs
index de8b140bc..db7084f3e 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -171,24 +171,7 @@ async fn pull_index_chunks<I: IndexFile>(
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
-            match &backend {
-                DatastoreBackend::Filesystem => {
-                    target2.insert_chunk(&chunk, &digest)?;
-                }
-                DatastoreBackend::S3(s3_client) => {
-                    if target2.cache_contains(&digest) {
-                        return Ok(());
-                    }
-                    target2.cache_insert(&digest, &chunk)?;
-                    let data = chunk.raw_data().to_vec();
-                    let upload_data = hyper::body::Bytes::from(data);
-                    let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
-                    let _is_duplicate = proxmox_async::runtime::block_on(
-                        s3_client.upload_no_replace_with_retry(object_key, upload_data),
-                    )
-                    .context("failed to upload chunk to s3 backend")?;
-                }
-            }
+            target2.insert_chunk(&chunk, &digest, &backend)?;
             Ok(())
         },
     );
-- 
2.47.3



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-11-03 11:31 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-11-03 11:31 [pbs-devel] [PATCH proxmox-backup 00/17] fix chunk upload/insert, rename corrupt chunks and GC race conditions for s3 backend Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 01/17] sync: pull: instantiate backend only once per sync job Christian Ebner
2025-11-03 14:51   ` Fabian Grünbichler
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 02/17] api/datastore: move group notes setting to the datastore Christian Ebner
2025-11-03 14:51   ` Fabian Grünbichler
2025-11-04  8:51     ` Christian Ebner
2025-11-04  9:13       ` Fabian Grünbichler
2025-11-04  9:37         ` Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 03/17] api/datastore: move snapshot deletion into dedicated datastore helper Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 04/17] api/datastore: move backup log upload by implementing " Christian Ebner
2025-11-03 14:51   ` Fabian Grünbichler
2025-11-04  8:47     ` Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 05/17] api/datastore: add dedicated datastore helper to set snapshot notes Christian Ebner
2025-11-03 11:31 ` Christian Ebner [this message]
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 07/17] verify: rename corrupted to corrupt in log output and function names Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 08/17] verify/datastore: make rename corrupt chunk a datastore helper method Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 09/17] datastore: refactor rename_corrupt_chunk error handling Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 10/17] datastore: implement per-chunk file locking helper for s3 backend Christian Ebner
2025-11-03 14:51   ` Fabian Grünbichler
2025-11-04  8:45     ` Christian Ebner
2025-11-04  9:01       ` Fabian Grünbichler
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 11/17] datastore: acquire chunk store mutex lock when renaming corrupt chunk Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 12/17] datastore: get per-chunk file lock for chunk rename on s3 backend Christian Ebner
2025-11-03 14:51   ` Fabian Grünbichler
2025-11-04  8:33     ` Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 13/17] fix #6961: datastore: verify: evict corrupt chunks from in-memory LRU cache Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 14/17] datastore: add locking to protect against races on chunk insert for s3 Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 15/17] GC: fix race with chunk upload/insert on s3 backends Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 16/17] GC: lock chunk marker before cleanup in phase 3 " Christian Ebner
2025-11-03 11:31 ` [pbs-devel] [PATCH proxmox-backup 17/17] datastore: GC: drop overly verbose info message during s3 chunk sweep Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20251103113120.239455-7-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal