From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 2AEFA1FF187 for ; Mon, 3 Nov 2025 12:31:17 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 40F8F1A900; Mon, 3 Nov 2025 12:31:48 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Mon, 3 Nov 2025 12:31:09 +0100 Message-ID: <20251103113120.239455-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20251103113120.239455-1-c.ebner@proxmox.com> References: <20251103113120.239455-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1762169483827 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.047 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup 06/17] datastore: refactor chunk insert based on backend X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" With the goal to move all the backend related implementation details into the datastore's method and deduplicate code as much as possible. The backend has to be passed in from the caller on chunk insert, as this stores the s3-client which lives for the whole backup session lifetime. Same is true for the pull sync job and the tape restore, although for latter the s3 functionality is not exposed yet as still incomplete. To distinguish between inserts in datastors with cache, for cases where chunks should bypass the cache, add a dedicated method for that. This will facilitate the implementation of the per-chunk file locking for cache and backend consistency on s3 backed stores. Signed-off-by: Christian Ebner --- pbs-datastore/src/datastore.rs | 85 ++++++++++++++++++++++++++++++++- src/api2/backup/upload_chunk.rs | 64 ++++--------------------- src/api2/tape/restore.rs | 6 ++- src/server/pull.rs | 19 +------- 4 files changed, 98 insertions(+), 76 deletions(-) diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index ae85be76d..9008d8fc6 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{bail, format_err, Context, Error}; use http_body_util::BodyExt; +use hyper::body::Bytes; use nix::unistd::{unlinkat, UnlinkatFlags}; use pbs_tools::lru_cache::LruCache; use tokio::io::AsyncWriteExt; @@ -1855,8 +1856,88 @@ impl DataStore { .cond_touch_chunk(digest, assert_exists) } - pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> { - self.inner.chunk_store.insert_chunk(chunk, digest) + /// Inserts the chunk with given digest in the chunk store based on the given backend. + /// + /// The backend is passed along in order to reuse an active connection to the backend, created + /// on environment instantiation, e.g. an s3 client which lives for the whole backup session. + /// This calls into async code, so callers must assure to never hold a Mutex lock. + pub fn insert_chunk( + &self, + chunk: &DataBlob, + digest: &[u8; 32], + backend: &DatastoreBackend, + ) -> Result<(bool, u64), Error> { + match backend { + DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), + DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, &s3_client), + } + } + + /// Inserts the chunk with given digest in the chunk store based on the given backend, but + /// always bypasses the local datastore cache. + /// + /// The backend is passed along in order to reuse an active connection to the backend, created + /// on environment instantiation, e.g. an s3 client which lives for the whole backup session. + /// This calls into async code, so callers must assure to never hold a Mutex lock. + /// + /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore + /// itself. + pub fn insert_chunk_no_cache( + &self, + chunk: &DataBlob, + digest: &[u8; 32], + backend: &DatastoreBackend, + ) -> Result<(bool, u64), Error> { + match backend { + DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), + DatastoreBackend::S3(s3_client) => { + let chunk_data: Bytes = chunk.raw_data().to_vec().into(); + let chunk_size = chunk_data.len() as u64; + let object_key = crate::s3::object_key_from_digest(digest)?; + let is_duplicate = proxmox_async::runtime::block_on( + s3_client.upload_no_replace_with_retry(object_key, chunk_data), + ) + .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; + Ok((is_duplicate, chunk_size)) + } + } + } + + fn insert_chunk_cached( + &self, + chunk: &DataBlob, + digest: &[u8; 32], + s3_client: &Arc, + ) -> Result<(bool, u64), Error> { + let chunk_data = chunk.raw_data(); + let chunk_size = chunk_data.len() as u64; + + // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache + // or the chunk marker file exists on filesystem. The latter means the chunk has + // been uploaded in the past, but was evicted from the LRU cache since but was not + // cleaned up by garbage collection, so contained in the S3 object store. + if self.cache_contains(&digest) { + tracing::info!("Skip upload of cached chunk {}", hex::encode(digest)); + return Ok((true, chunk_size)); + } + if let Ok(true) = self.cond_touch_chunk(digest, false) { + tracing::info!( + "Skip upload of already encountered chunk {}", + hex::encode(digest), + ); + return Ok((true, chunk_size)); + } + + tracing::info!("Upload new chunk {}", hex::encode(digest)); + let object_key = crate::s3::object_key_from_digest(digest)?; + let chunk_data: Bytes = chunk_data.to_vec().into(); + let is_duplicate = proxmox_async::runtime::block_on( + s3_client.upload_no_replace_with_retry(object_key, chunk_data), + ) + .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; + tracing::info!("Caching of chunk {}", hex::encode(digest)); + self.cache_insert(&digest, &chunk)?; + Ok((is_duplicate, chunk_size)) } pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result { diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 297f23b12..f4a50002d 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -5,7 +5,7 @@ use anyhow::{bail, format_err, Error}; use futures::*; use hex::FromHex; use http_body_util::{BodyDataStream, BodyExt}; -use hyper::body::{Bytes, Incoming}; +use hyper::body::Incoming; use hyper::http::request::Parts; use serde_json::{json, Value}; @@ -15,7 +15,7 @@ use proxmox_sortable_macro::sortable; use pbs_api_types::{BACKUP_ARCHIVE_NAME_SCHEMA, CHUNK_DIGEST_SCHEMA}; use pbs_datastore::file_formats::{DataBlobHeader, EncryptedDataBlobHeader}; -use pbs_datastore::{DataBlob, DatastoreBackend}; +use pbs_datastore::DataBlob; use pbs_tools::json::{required_integer_param, required_string_param}; use super::environment::*; @@ -232,59 +232,15 @@ async fn upload_to_backend( let (digest, size, chunk) = UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?; - match &env.backend { - DatastoreBackend::Filesystem => { - let (is_duplicate, compressed_size) = proxmox_async::runtime::block_in_place(|| { - env.datastore.insert_chunk(&chunk, &digest) - })?; - Ok((digest, size, compressed_size as u32, is_duplicate)) - } - DatastoreBackend::S3(s3_client) => { - let chunk_data: Bytes = chunk.raw_data().to_vec().into(); - - if env.no_cache { - let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?; - let is_duplicate = s3_client - .upload_no_replace_with_retry(object_key, chunk_data) - .await - .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; - return Ok((digest, size, encoded_size, is_duplicate)); - } - - // Avoid re-upload to S3 if the chunk is either present in the LRU cache or the chunk - // file exists on filesystem. The latter means that the chunk has been present in the - // past an was not cleaned up by garbage collection, so contained in the S3 object store. - if env.datastore.cache_contains(&digest) { - tracing::info!("Skip upload of cached chunk {}", hex::encode(digest)); - return Ok((digest, size, encoded_size, true)); - } - if let Ok(true) = env.datastore.cond_touch_chunk(&digest, false) { - tracing::info!( - "Skip upload of already encountered chunk {}", - hex::encode(digest) - ); - return Ok((digest, size, encoded_size, true)); - } - - tracing::info!("Upload of new chunk {}", hex::encode(digest)); - let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?; - let is_duplicate = s3_client - .upload_no_replace_with_retry(object_key, chunk_data) - .await - .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; - - // Only insert the chunk into the cache after it has been successufuly uploaded. - // Although less performant than doing this in parallel, it is required for consisency - // since chunks are considered as present on the backend if the file exists in the local - // cache store. - let datastore = env.datastore.clone(); - tracing::info!("Caching of chunk {}", hex::encode(digest)); - let _ = tokio::task::spawn_blocking(move || datastore.cache_insert(&digest, &chunk)) - .await?; - - Ok((digest, size, encoded_size, is_duplicate)) - } + if env.no_cache { + let (is_duplicate, chunk_size) = + env.datastore + .insert_chunk_no_cache(&chunk, &digest, &env.backend)?; + return Ok((digest, size, chunk_size as u32, is_duplicate)); } + + let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?; + Ok((digest, size, chunk_size as u32, is_duplicate)) } pub const API_METHOD_UPLOAD_SPEEDTEST: ApiMethod = ApiMethod::new( diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 8b6979017..4f2ee3db6 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -1151,6 +1151,7 @@ fn restore_partial_chunk_archive<'a>( let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); let bytes2 = bytes.clone(); + let backend = datastore.backend()?; let writer_pool = ParallelHandler::new( "tape restore chunk writer", 4, @@ -1162,7 +1163,7 @@ fn restore_partial_chunk_archive<'a>( chunk.decode(None, Some(&digest))?; // verify digest } - datastore.insert_chunk(&chunk, &digest)?; + datastore.insert_chunk(&chunk, &digest, &backend)?; } Ok(()) }, @@ -1544,6 +1545,7 @@ fn restore_chunk_archive<'a>( let bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); let bytes2 = bytes.clone(); + let backend = datastore.backend()?; let writer_pool = ParallelHandler::new( "tape restore chunk writer", 4, @@ -1560,7 +1562,7 @@ fn restore_chunk_archive<'a>( chunk.decode(None, Some(&digest))?; // verify digest } - datastore.insert_chunk(&chunk, &digest)?; + datastore.insert_chunk(&chunk, &digest, &backend)?; } else if verbose { info!("Found existing chunk: {}", hex::encode(digest)); } diff --git a/src/server/pull.rs b/src/server/pull.rs index de8b140bc..db7084f3e 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -171,24 +171,7 @@ async fn pull_index_chunks( move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { // println!("verify and write {}", hex::encode(&digest)); chunk.verify_unencrypted(size as usize, &digest)?; - match &backend { - DatastoreBackend::Filesystem => { - target2.insert_chunk(&chunk, &digest)?; - } - DatastoreBackend::S3(s3_client) => { - if target2.cache_contains(&digest) { - return Ok(()); - } - target2.cache_insert(&digest, &chunk)?; - let data = chunk.raw_data().to_vec(); - let upload_data = hyper::body::Bytes::from(data); - let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?; - let _is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, upload_data), - ) - .context("failed to upload chunk to s3 backend")?; - } - } + target2.insert_chunk(&chunk, &digest, &backend)?; Ok(()) }, ); -- 2.47.3 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel