From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v2 08/19] datastore: refactor chunk insert based on backend
Date: Tue, 4 Nov 2025 14:06:48 +0100 [thread overview]
Message-ID: <20251104130659.435139-9-c.ebner@proxmox.com> (raw)
In-Reply-To: <20251104130659.435139-1-c.ebner@proxmox.com>
With the goal to move all the backend related implementation details
into the datastore's method and deduplicate code as much as possible.
The backend has to be passed in from the caller on chunk insert, as
this stores the s3-client which lives for the whole backup session
lifetime. Same is true for the pull sync job and the tape restore,
although for latter the s3 functionality is not exposed yet as still
incomplete.
To distinguish between inserts in datastors with cache, for cases
where chunks should bypass the cache, add a dedicated method for that.
This will facilitate the implementation of the per-chunk file locking
for cache and backend consistency on s3 backed stores.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Changes since version 1:
- no changes
pbs-datastore/src/datastore.rs | 85 ++++++++++++++++++++++++++++++++-
src/api2/backup/upload_chunk.rs | 64 ++++---------------------
src/api2/tape/restore.rs | 6 ++-
src/server/pull.rs | 19 +-------
4 files changed, 98 insertions(+), 76 deletions(-)
diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index 343f49f36..b3c591edb 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{bail, format_err, Context, Error};
use http_body_util::BodyExt;
+use hyper::body::Bytes;
use nix::unistd::{unlinkat, UnlinkatFlags};
use pbs_tools::lru_cache::LruCache;
use tokio::io::AsyncWriteExt;
@@ -1881,8 +1882,88 @@ impl DataStore {
.cond_touch_chunk(digest, assert_exists)
}
- pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
- self.inner.chunk_store.insert_chunk(chunk, digest)
+ /// Inserts the chunk with given digest in the chunk store based on the given backend.
+ ///
+ /// The backend is passed along in order to reuse an active connection to the backend, created
+ /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+ /// This calls into async code, so callers must assure to never hold a Mutex lock.
+ pub fn insert_chunk(
+ &self,
+ chunk: &DataBlob,
+ digest: &[u8; 32],
+ backend: &DatastoreBackend,
+ ) -> Result<(bool, u64), Error> {
+ match backend {
+ DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+ DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, &s3_client),
+ }
+ }
+
+ /// Inserts the chunk with given digest in the chunk store based on the given backend, but
+ /// always bypasses the local datastore cache.
+ ///
+ /// The backend is passed along in order to reuse an active connection to the backend, created
+ /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
+ /// This calls into async code, so callers must assure to never hold a Mutex lock.
+ ///
+ /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore
+ /// itself.
+ pub fn insert_chunk_no_cache(
+ &self,
+ chunk: &DataBlob,
+ digest: &[u8; 32],
+ backend: &DatastoreBackend,
+ ) -> Result<(bool, u64), Error> {
+ match backend {
+ DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+ DatastoreBackend::S3(s3_client) => {
+ let chunk_data: Bytes = chunk.raw_data().to_vec().into();
+ let chunk_size = chunk_data.len() as u64;
+ let object_key = crate::s3::object_key_from_digest(digest)?;
+ let is_duplicate = proxmox_async::runtime::block_on(
+ s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+ )
+ .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+ Ok((is_duplicate, chunk_size))
+ }
+ }
+ }
+
+ fn insert_chunk_cached(
+ &self,
+ chunk: &DataBlob,
+ digest: &[u8; 32],
+ s3_client: &Arc<S3Client>,
+ ) -> Result<(bool, u64), Error> {
+ let chunk_data = chunk.raw_data();
+ let chunk_size = chunk_data.len() as u64;
+
+ // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache
+ // or the chunk marker file exists on filesystem. The latter means the chunk has
+ // been uploaded in the past, but was evicted from the LRU cache since but was not
+ // cleaned up by garbage collection, so contained in the S3 object store.
+ if self.cache_contains(&digest) {
+ tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
+ return Ok((true, chunk_size));
+ }
+ if let Ok(true) = self.cond_touch_chunk(digest, false) {
+ tracing::info!(
+ "Skip upload of already encountered chunk {}",
+ hex::encode(digest),
+ );
+ return Ok((true, chunk_size));
+ }
+
+ tracing::info!("Upload new chunk {}", hex::encode(digest));
+ let object_key = crate::s3::object_key_from_digest(digest)?;
+ let chunk_data: Bytes = chunk_data.to_vec().into();
+ let is_duplicate = proxmox_async::runtime::block_on(
+ s3_client.upload_no_replace_with_retry(object_key, chunk_data),
+ )
+ .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+ tracing::info!("Caching of chunk {}", hex::encode(digest));
+ self.cache_insert(&digest, &chunk)?;
+ Ok((is_duplicate, chunk_size))
}
pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 297f23b12..f4a50002d 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -5,7 +5,7 @@ use anyhow::{bail, format_err, Error};
use futures::*;
use hex::FromHex;
use http_body_util::{BodyDataStream, BodyExt};
-use hyper::body::{Bytes, Incoming};
+use hyper::body::Incoming;
use hyper::http::request::Parts;
use serde_json::{json, Value};
@@ -15,7 +15,7 @@ use proxmox_sortable_macro::sortable;
use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA};
use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader};
-use pbs_datastore::{DataBlob, DatastoreBackend};
+use pbs_datastore::DataBlob;
use pbs_tools::json::{required_integer_param, required_string_param};
use super::environment::*;
@@ -232,59 +232,15 @@ async fn upload_to_backend(
let (digest, size, chunk) =
UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?;
- match &env.backend {
- DatastoreBackend::Filesystem => {
- let (is_duplicate, compressed_size) = proxmox_async::runtime::block_in_place(|| {
- env.datastore.insert_chunk(&chunk, &digest)
- })?;
- Ok((digest, size, compressed_size as u32, is_duplicate))
- }
- DatastoreBackend::S3(s3_client) => {
- let chunk_data: Bytes = chunk.raw_data().to_vec().into();
-
- if env.no_cache {
- let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
- let is_duplicate = s3_client
- .upload_no_replace_with_retry(object_key, chunk_data)
- .await
- .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
- return Ok((digest, size, encoded_size, is_duplicate));
- }
-
- // Avoid re-upload to S3 if the chunk is either present in the LRU cache or the chunk
- // file exists on filesystem. The latter means that the chunk has been present in the
- // past an was not cleaned up by garbage collection, so contained in the S3 object store.
- if env.datastore.cache_contains(&digest) {
- tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
- return Ok((digest, size, encoded_size, true));
- }
- if let Ok(true) = env.datastore.cond_touch_chunk(&digest, false) {
- tracing::info!(
- "Skip upload of already encountered chunk {}",
- hex::encode(digest)
- );
- return Ok((digest, size, encoded_size, true));
- }
-
- tracing::info!("Upload of new chunk {}", hex::encode(digest));
- let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
- let is_duplicate = s3_client
- .upload_no_replace_with_retry(object_key, chunk_data)
- .await
- .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
-
- // Only insert the chunk into the cache after it has been successufuly uploaded.
- // Although less performant than doing this in parallel, it is required for consisency
- // since chunks are considered as present on the backend if the file exists in the local
- // cache store.
- let datastore = env.datastore.clone();
- tracing::info!("Caching of chunk {}", hex::encode(digest));
- let _ = tokio::task::spawn_blocking(move || datastore.cache_insert(&digest, &chunk))
- .await?;
-
- Ok((digest, size, encoded_size, is_duplicate))
- }
+ if env.no_cache {
+ let (is_duplicate, chunk_size) =
+ env.datastore
+ .insert_chunk_no_cache(&chunk, &digest, &env.backend)?;
+ return Ok((digest, size, chunk_size as u32, is_duplicate));
}
+
+ let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
+ Ok((digest, size, chunk_size as u32, is_duplicate))
}
pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new(
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 8b6979017..4f2ee3db6 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1151,6 +1151,7 @@ fn restore_partial_chunk_archive<'a>(
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
let bytes2 = bytes.clone();
+ let backend = datastore.backend()?;
let writer_pool = ParallelHandler::new(
"tape restore chunk writer",
4,
@@ -1162,7 +1163,7 @@ fn restore_partial_chunk_archive<'a>(
chunk.decode(None, Some(&digest))?; // verify digest
}
- datastore.insert_chunk(&chunk, &digest)?;
+ datastore.insert_chunk(&chunk, &digest, &backend)?;
}
Ok(())
},
@@ -1544,6 +1545,7 @@ fn restore_chunk_archive<'a>(
let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
let bytes2 = bytes.clone();
+ let backend = datastore.backend()?;
let writer_pool = ParallelHandler::new(
"tape restore chunk writer",
4,
@@ -1560,7 +1562,7 @@ fn restore_chunk_archive<'a>(
chunk.decode(None, Some(&digest))?; // verify digest
}
- datastore.insert_chunk(&chunk, &digest)?;
+ datastore.insert_chunk(&chunk, &digest, &backend)?;
} else if verbose {
info!("Found existing chunk: {}", hex::encode(digest));
}
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 94b2fbf55..b789d3d8c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -171,24 +171,7 @@ async fn pull_index_chunks<I: IndexFile>(
move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
// println!("verify and write {}", hex::encode(&digest));
chunk.verify_unencrypted(size as usize, &digest)?;
- match &backend {
- DatastoreBackend::Filesystem => {
- target2.insert_chunk(&chunk, &digest)?;
- }
- DatastoreBackend::S3(s3_client) => {
- if target2.cache_contains(&digest) {
- return Ok(());
- }
- target2.cache_insert(&digest, &chunk)?;
- let data = chunk.raw_data().to_vec();
- let upload_data = hyper::body::Bytes::from(data);
- let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
- let _is_duplicate = proxmox_async::runtime::block_on(
- s3_client.upload_no_replace_with_retry(object_key, upload_data),
- )
- .context("failed to upload chunk to s3 backend")?;
- }
- }
+ target2.insert_chunk(&chunk, &digest, &backend)?;
Ok(())
},
);
--
2.47.3
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2025-11-04 13:07 UTC|newest]
Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top
2025-11-04 13:06 [pbs-devel] [PATCH proxmox-backup v2 00/19] fix chunk upload/insert, rename corrupt chunks and GC race conditions for s3 backend Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 01/19] sync: pull: instantiate backend only once per sync job Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 02/19] api/datastore: move group notes setting to the datastore Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 03/19] api/datastore: move snapshot deletion into dedicated datastore helper Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 04/19] api/datastore: move backup log upload by implementing " Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 05/19] api: backup: use datastore add_blob helper for backup session Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 06/19] api/datastore: add dedicated datastore helper to set snapshot notes Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 07/19] api/datastore: move s3 index upload helper to datastore backend Christian Ebner
2025-11-04 13:06 ` Christian Ebner [this message]
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 09/19] verify: rename corrupted to corrupt in log output and function names Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 10/19] verify/datastore: make rename corrupt chunk a datastore helper method Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 11/19] datastore: refactor rename_corrupt_chunk error handling Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 12/19] chunk store: implement per-chunk file locking helper for s3 backend Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 13/19] datastore: acquire chunk store mutex lock when renaming corrupt chunk Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 14/19] datastore: get per-chunk file lock for chunk rename on s3 backend Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 15/19] fix #6961: datastore: verify: evict corrupt chunks from in-memory LRU cache Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 16/19] datastore: add locking to protect against races on chunk insert for s3 Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 17/19] GC: fix race with chunk upload/insert on s3 backends Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 18/19] GC: lock chunk marker before cleanup in phase 3 " Christian Ebner
2025-11-04 13:06 ` [pbs-devel] [PATCH proxmox-backup v2 19/19] datastore: GC: drop overly verbose info message during s3 chunk sweep Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20251104130659.435139-9-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox