From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6AC931FF138 for ; Mon, 01 Jun 2026 14:32:32 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 49B241C59B; Mon, 1 Jun 2026 14:32:32 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup 2/4] datastore/api: s3: wrap s3 client into s3 backend type Date: Mon, 1 Jun 2026 14:31:22 +0200 Message-ID: <20260601123124.461765-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260601123124.461765-1-c.ebner@proxmox.com> References: <20260601123124.461765-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1780317075186 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.063 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record T_SPF_PERMERROR 0.01 SPF: test of record failed (permerror) Message-ID-Hash: ESSCZF2ACLRTFQCNWGDO7FSFD36ADGVC X-Message-ID-Hash: ESSCZF2ACLRTFQCNWGDO7FSFD36ADGVC X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: In preparation for storing additional properties required for the s3 backend to operate. This will be used to extend the backend by the backup user in order to reduce user to uid/gid lookups. Refactors variable and method names as well as call sites accordingly. No functional changes intended. Signed-off-by: Christian Ebner --- pbs-datastore/src/backup_info.rs | 64 +++--- pbs-datastore/src/chunk_store.rs | 19 +- pbs-datastore/src/datastore.rs | 182 +++++++++++------- pbs-datastore/src/lib.rs | 2 +- pbs-datastore/src/local_chunk_reader.rs | 19 +- .../src/local_datastore_lru_cache.rs | 13 +- src/api2/config/datastore.rs | 62 +++--- src/api2/reader/mod.rs | 13 +- src/backup/verify.rs | 4 +- src/server/pull.rs | 10 +- 10 files changed, 217 insertions(+), 171 deletions(-) diff --git a/pbs-datastore/src/backup_info.rs b/pbs-datastore/src/backup_info.rs index 8abeeb2ea..e2407b648 100644 --- a/pbs-datastore/src/backup_info.rs +++ b/pbs-datastore/src/backup_info.rs @@ -247,18 +247,20 @@ impl BackupGroup { delete_stats.increment_removed_snapshots(); } - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let path = self.relative_group_path(); let group_prefix = path .to_str() .ok_or_else(|| format_err!("invalid group path prefix"))?; let prefix = format!("{S3_CONTENT_PREFIX}/{group_prefix}"); let delete_objects_errors = proxmox_async::runtime::block_on( - s3_client.delete_objects_by_prefix_with_suffix_filter( - &S3PathPrefix::Some(prefix), - PROTECTED_MARKER_FILENAME, - &[GROUP_OWNER_FILE_NAME, GROUP_NOTES_FILE_NAME], - ), + s3_backend + .client + .delete_objects_by_prefix_with_suffix_filter( + &S3PathPrefix::Some(prefix), + PROTECTED_MARKER_FILENAME, + &[GROUP_OWNER_FILE_NAME, GROUP_NOTES_FILE_NAME], + ), )?; if !delete_objects_errors.is_empty() { crate::s3::log_s3_delete_objects_errors(&delete_objects_errors); @@ -381,20 +383,22 @@ impl BackupGroup { } } - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let dst_key = crate::s3::object_key_from_path( &target.relative_group_path(), GROUP_NOTES_FILE_NAME, ) .context("invalid target notes object key")?; let data = hyper::body::Bytes::copy_from_slice(&src_notes); - proxmox_async::runtime::block_on(s3_client.upload_replace_with_retry(dst_key, data)) - .with_context(|| { - format!( - "failed to upload group notes on S3 backend for '{}' in '{}'", - target.group, target.ns, - ) - })?; + proxmox_async::runtime::block_on( + s3_backend.client.upload_replace_with_retry(dst_key, data), + ) + .with_context(|| { + format!( + "failed to upload group notes on S3 backend for '{}' in '{}'", + target.group, target.ns, + ) + })?; } std::fs::rename(&src_notes_path, &dst_notes_path).with_context(|| { @@ -799,7 +803,7 @@ impl BackupDir { format!("failed to move snapshot {src_snap_path:?} to {dst_snap_path:?}") })?; } - DatastoreBackend::S3(s3_client) => { + DatastoreBackend::S3(s3_backend) => { let src_rel = self.relative_path(); let src_rel_str = src_rel .to_str() @@ -823,7 +827,7 @@ impl BackupDir { loop { let result = proxmox_async::runtime::block_on( - s3_client.list_objects_v2(&prefix, token.as_deref()), + s3_backend.client.list_objects_v2(&prefix, token.as_deref()), ) .context("failed to list snapshot objects on S3 backend")?; @@ -842,7 +846,7 @@ impl BackupDir { let dst_key = S3ObjectKey::try_from(dst_key_str.as_str())?; proxmox_async::runtime::block_on( - s3_client.copy_object(src_key.clone(), dst_key), + s3_backend.client.copy_object(src_key.clone(), dst_key), ) .with_context(|| format!("failed to copy S3 object '{rel_key}'"))?; src_keys.push(src_key); @@ -862,9 +866,9 @@ impl BackupDir { // Delete source S3 objects. Treat failures as warnings since the snapshot // is already at the target. for src_key in src_keys { - if let Err(err) = - proxmox_async::runtime::block_on(s3_client.delete_object(src_key.clone())) - { + if let Err(err) = proxmox_async::runtime::block_on( + s3_backend.client.delete_object(src_key.clone()), + ) { log::warn!( "S3 move: failed to delete source object '{src_key:?}' \ (snapshot already at target, orphaned object requires manual removal): \ @@ -900,14 +904,16 @@ impl BackupDir { bail!("cannot remove protected snapshot"); // use special error type? } - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let path = self.relative_path(); let snapshot_prefix = path .to_str() .ok_or_else(|| format_err!("invalid snapshot path"))?; let prefix = format!("{S3_CONTENT_PREFIX}/{snapshot_prefix}"); let delete_objects_error = proxmox_async::runtime::block_on( - s3_client.delete_objects_by_prefix(&S3PathPrefix::Some(prefix)), + s3_backend + .client + .delete_objects_by_prefix(&S3PathPrefix::Some(prefix)), )?; if !delete_objects_error.is_empty() { crate::s3::log_s3_delete_objects_errors(&delete_objects_error); @@ -951,11 +957,11 @@ impl BackupDir { // old one would race here). if group.list_backups()?.is_empty() && !*OLD_LOCKING { group.remove_group_dir()?; - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let object_key = super::s3::object_key_from_path(&group.relative_group_path(), "owner") .context("invalid owner file object key")?; - proxmox_async::runtime::block_on(s3_client.delete_object(object_key))?; + proxmox_async::runtime::block_on(s3_backend.client.delete_object(object_key))?; } } @@ -1004,13 +1010,17 @@ impl BackupDir { let blob = DataBlob::encode(manifest.as_bytes(), None, true)?; let raw_data = blob.raw_data(); - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let object_key = super::s3::object_key_from_path(&self.relative_path(), MANIFEST_BLOB_NAME.as_ref()) .context("invalid manifest object key")?; let data = hyper::body::Bytes::copy_from_slice(raw_data); - proxmox_async::runtime::block_on(s3_client.upload_replace_with_retry(object_key, data)) - .context("failed to update manifest on s3 backend")?; + proxmox_async::runtime::block_on( + s3_backend + .client + .upload_replace_with_retry(object_key, data), + ) + .context("failed to update manifest on s3 backend")?; } let mut path = self.full_path(); diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs index a936f5034..f8a2696ce 100644 --- a/pbs-datastore/src/chunk_store.rs +++ b/pbs-datastore/src/chunk_store.rs @@ -11,7 +11,6 @@ use tracing::{info, warn}; use pbs_api_types::{DatastoreFSyncLevel, GarbageCollectionStatus}; use pbs_config::BackupLockGuard; use proxmox_io::ReadExt; -use proxmox_s3_client::S3Client; use proxmox_sys::fs::{CreateOptions, create_dir, create_path, file_type_from_file_stat}; use proxmox_sys::process_locker::{ ProcessLockExclusiveGuard, ProcessLockSharedGuard, ProcessLocker, @@ -23,7 +22,7 @@ use crate::data_blob::DataChunkBuilder; use crate::file_formats::{ COMPRESSED_BLOB_MAGIC_1_0, ENCRYPTED_BLOB_MAGIC_1_0, UNCOMPRESSED_BLOB_MAGIC_1_0, }; -use crate::{DataBlob, LocalDatastoreLruCache}; +use crate::{DataBlob, LocalDatastoreLruCache, S3Backend}; const USING_MARKER_FILENAME_EXT: &str = "using"; @@ -603,19 +602,21 @@ impl ChunkStore { pub fn check_fs_atime_updates( &self, retry_on_file_changed: bool, - s3_client: Option>, + s3_backend: Option>, ) -> Result<(), Error> { let (zero_chunk, digest) = DataChunkBuilder::build_zero_chunk(None, 4096 * 1024, true)?; let (path, _digest) = self.chunk_path(&digest); - if let Some(ref s3_client) = s3_client { + if let Some(s3_backend) = &s3_backend { if let Err(err) = std::fs::metadata(&path) { if err.kind() == std::io::ErrorKind::NotFound { let object_key = crate::s3::object_key_from_digest(&digest)?; - proxmox_async::runtime::block_on(s3_client.upload_no_replace_with_retry( - object_key, - zero_chunk.raw_data().to_vec().into(), - )) + proxmox_async::runtime::block_on( + s3_backend.client.upload_no_replace_with_retry( + object_key, + zero_chunk.raw_data().to_vec().into(), + ), + ) .context("failed to upload chunk to s3 backend")?; } } @@ -642,7 +643,7 @@ impl ChunkStore { // two metadata calls, try to check once again on changed file if metadata_before.ino() != metadata_now.ino() { if retry_on_file_changed { - return self.check_fs_atime_updates(false, s3_client); + return self.check_fs_atime_updates(false, s3_backend); } bail!("chunk {path:?} changed twice during access time safety check, cannot proceed."); } diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index e2d1ae67c..42aa96464 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -338,7 +338,12 @@ pub enum DatastoreBackend { /// Storage is located on local filesystem. Filesystem, /// Storage is located on S3 compatible object store. - S3(Arc), + S3(Arc), +} +/// Properties of the S3 datastore backend +pub struct S3Backend { + // S3 client instance for current S3 backend + pub client: S3Client, } impl DatastoreBackend { @@ -352,7 +357,7 @@ impl DatastoreBackend { ) -> Result { match self { Self::Filesystem => Ok(false), - Self::S3(s3_client) => { + Self::S3(s3_backend) => { let object_key = crate::s3::object_key_from_path(&backup_dir.relative_path(), name) .context("invalid index file object key")?; @@ -362,7 +367,8 @@ impl DatastoreBackend { .await .context("failed to read index contents")?; let contents = hyper::body::Bytes::from(data); - let _is_duplicate = s3_client + let _is_duplicate = s3_backend + .client .upload_replace_with_retry(object_key, contents) .await?; Ok(true) @@ -518,7 +524,8 @@ impl DataStore { })); } let s3_client = S3Client::new(options)?; - DatastoreBackend::S3(Arc::new(s3_client)) + let s3_backend = S3Backend { client: s3_client }; + DatastoreBackend::S3(Arc::new(s3_backend)) } }; @@ -998,11 +1005,13 @@ impl DataStore { // construct ns before mkdir to enforce max-depth and name validity let ns = BackupNamespace::from_parent_ns(parent, name)?; - if let DatastoreBackend::S3(s3_client) = self.backend()? { + if let DatastoreBackend::S3(s3_backend) = self.backend()? { let object_key = crate::s3::object_key_from_path(&ns.path(), NAMESPACE_MARKER_FILENAME) .context("invalid namespace marker object key")?; let _is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, hyper::body::Bytes::from("")), + s3_backend + .client + .upload_no_replace_with_retry(object_key, hyper::body::Bytes::from("")), ) .context("failed to create namespace on s3 backend")?; } @@ -1112,13 +1121,13 @@ impl DataStore { log::warn!("failed to remove namespace {ns} - {err}") } } - if let DatastoreBackend::S3(s3_client) = &backend { + if let DatastoreBackend::S3(s3_backend) = &backend { // Only remove the namespace marker, if it was empty, // than this is the same as the namespace being removed. let object_key = crate::s3::object_key_from_path(&ns.path(), NAMESPACE_MARKER_FILENAME) .context("invalid namespace marker object key")?; - proxmox_async::runtime::block_on(s3_client.delete_object(object_key))?; + proxmox_async::runtime::block_on(s3_backend.client.delete_object(object_key))?; } } @@ -1154,18 +1163,20 @@ impl DataStore { // When destroying the full subtree, a bulk S3 delete under the root prefix catches // any orphaned group-level metadata (owner/notes) before per-level cleanup iterates. if delete_groups { - if let DatastoreBackend::S3(s3_client) = &self.backend()? { + if let DatastoreBackend::S3(s3_backend) = &self.backend()? { let ns_dir = ns.path(); let ns_prefix = ns_dir .to_str() .ok_or_else(|| format_err!("invalid namespace path prefix"))?; let prefix = format!("{S3_CONTENT_PREFIX}/{ns_prefix}"); let delete_objects_error = proxmox_async::runtime::block_on( - s3_client.delete_objects_by_prefix_with_suffix_filter( - &S3PathPrefix::Some(prefix), - PROTECTED_MARKER_FILENAME, - &[GROUP_OWNER_FILE_NAME, GROUP_NOTES_FILE_NAME], - ), + s3_backend + .client + .delete_objects_by_prefix_with_suffix_filter( + &S3PathPrefix::Some(prefix), + PROTECTED_MARKER_FILENAME, + &[GROUP_OWNER_FILE_NAME, GROUP_NOTES_FILE_NAME], + ), )?; if !delete_objects_error.is_empty() { crate::s3::log_s3_delete_objects_errors(&delete_objects_error); @@ -1743,14 +1754,16 @@ impl DataStore { ) -> Result<(), Error> { let path = self.owner_path(ns, backup_group); - if let DatastoreBackend::S3(s3_client) = self.backend()? { + if let DatastoreBackend::S3(s3_backend) = self.backend()? { let mut path = ns.path(); path.push(backup_group.to_string()); let object_key = crate::s3::object_key_from_path(&path, GROUP_OWNER_FILE_NAME) .context("invalid owner file object key")?; let data = hyper::body::Bytes::from(format!("{auth_id}\n")); let _is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_replace_with_retry(object_key, data), + s3_backend + .client + .upload_replace_with_retry(object_key, data), ) .context("failed to set owner on s3 backend")?; } @@ -2110,7 +2123,7 @@ impl DataStore { chunk_lru_cache: &mut Option>, status: &mut GarbageCollectionStatus, worker: &dyn WorkerTaskContext, - s3_client: Option>, + s3_backend: Option>, ) -> Result<(), Error> { status.index_file_count += 1; status.index_data_bytes += index.index_bytes(); @@ -2139,7 +2152,7 @@ impl DataStore { // file requires the chunk anymore (won't get to this loop then) let is_bad = self.inner.chunk_store.cond_touch_bad_chunks(digest)?; - if let Some(ref _s3_client) = s3_client { + if let Some(_s3_backend) = &s3_backend { // Do not retry here, this is very unlikely to happen as chunk markers will // most likely only be missing if the local cache store was recreated. let _guard = self @@ -2167,7 +2180,7 @@ impl DataStore { status: &mut GarbageCollectionStatus, worker: &dyn WorkerTaskContext, cache_capacity: usize, - s3_client: Option>, + s3_backend: Option>, ) -> Result<(), Error> { // Iterate twice over the datastore to fetch index files, even if this comes with an // additional runtime cost: @@ -2261,7 +2274,7 @@ impl DataStore { &mut chunk_lru_cache, status, worker, - s3_client.as_ref().cloned(), + s3_backend.as_ref().cloned(), )?; if !unprocessed_index_list.remove(&path) { @@ -2304,7 +2317,7 @@ impl DataStore { &mut chunk_lru_cache, status, worker, - s3_client.as_ref().cloned(), + s3_backend.as_ref().cloned(), )?; warn!("Marked chunks for unexpected index file at '{path:?}'"); } @@ -2326,7 +2339,7 @@ impl DataStore { &mut chunk_lru_cache, status, worker, - s3_client.as_ref().cloned(), + s3_backend.as_ref().cloned(), ) })?; @@ -2405,19 +2418,19 @@ impl DataStore { }; let tuning = pbs_config::datastore::parse_datastore_tuning_options(&gc_store_config)?; - let s3_client = match self.backend()? { + let s3_backend = match self.backend()? { DatastoreBackend::Filesystem => None, - DatastoreBackend::S3(s3_client) => { - proxmox_async::runtime::block_on(s3_client.head_bucket()) + DatastoreBackend::S3(s3_backend) => { + proxmox_async::runtime::block_on(s3_backend.client.head_bucket()) .context("failed to reach bucket")?; - Some(s3_client) + Some(s3_backend) } }; if tuning.gc_atime_safety_check.unwrap_or(true) { self.inner .chunk_store - .check_fs_atime_updates(true, s3_client.clone()) + .check_fs_atime_updates(true, s3_backend.clone()) .context("atime safety check failed")?; info!("Access time update check successful, proceeding with GC."); } else { @@ -2458,18 +2471,18 @@ impl DataStore { &mut gc_status, worker, gc_cache_capacity, - s3_client.as_ref().cloned(), + s3_backend.as_ref().cloned(), ) .context("marking used chunks failed")?; info!("Start GC phase2 (sweep unused chunks)"); - if let Some(ref s3_client) = s3_client { + if let Some(s3_backend) = &s3_backend { let mut chunk_count = 0; let prefix = S3PathPrefix::Some(".chunks/".to_string()); // Operates in batches of 1000 objects max per request let mut list_bucket_result = - proxmox_async::runtime::block_on(s3_client.list_objects_v2(&prefix, None)) + proxmox_async::runtime::block_on(s3_backend.client.list_objects_v2(&prefix, None)) .context("failed to list chunk in s3 object store")?; let mut delete_list = @@ -2547,12 +2560,12 @@ impl DataStore { drop(_guard); // limit pending deletes to avoid holding too many chunk flocks - delete_list.conditional_delete_and_drop_locks(s3_client)?; + delete_list.conditional_delete_and_drop_locks(s3_backend)?; } // Process next batch of chunks if there is more if list_bucket_result.is_truncated { list_bucket_result = - proxmox_async::runtime::block_on(s3_client.list_objects_v2( + proxmox_async::runtime::block_on(s3_backend.client.list_objects_v2( &prefix, list_bucket_result.next_continuation_token.as_deref(), ))?; @@ -2563,7 +2576,7 @@ impl DataStore { } // delete the last batch of objects, if there are any remaining - delete_list.delete_and_drop_locks(s3_client)?; + delete_list.delete_and_drop_locks(s3_backend)?; info!("processed {chunk_count} total chunks"); @@ -2739,7 +2752,7 @@ impl DataStore { ) -> Result<(bool, u64), Error> { match backend { DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), - DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, s3_client), + DatastoreBackend::S3(s3_backend) => self.insert_chunk_cached(chunk, digest, s3_backend), } } @@ -2760,7 +2773,7 @@ impl DataStore { ) -> Result<(bool, u64), Error> { match backend { DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), - DatastoreBackend::S3(s3_client) => { + DatastoreBackend::S3(s3_backend) => { let _chunk_guard = self .inner .chunk_store @@ -2769,7 +2782,9 @@ impl DataStore { let chunk_size = chunk_data.len() as u64; let object_key = crate::s3::object_key_from_digest(digest)?; let is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, chunk_data), + s3_backend + .client + .upload_no_replace_with_retry(object_key, chunk_data), ) .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; Ok((is_duplicate, chunk_size)) @@ -2781,7 +2796,7 @@ impl DataStore { &self, chunk: &DataBlob, digest: &[u8; 32], - s3_client: &Arc, + s3_backend: &Arc, ) -> Result<(bool, u64), Error> { let chunk_data = chunk.raw_data(); let chunk_size = chunk_data.len() as u64; @@ -2810,7 +2825,9 @@ impl DataStore { let object_key = crate::s3::object_key_from_digest(digest)?; let chunk_data: Bytes = chunk_data.to_vec().into(); let is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, chunk_data), + s3_backend + .client + .upload_no_replace_with_retry(object_key, chunk_data), ) .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; tracing::info!("Caching of chunk {}", hex::encode(digest)); @@ -2857,14 +2874,15 @@ impl DataStore { if protection { std::fs::File::create(&protected_path) .map_err(|err| format_err!("could not create protection file: {}", err))?; - if let DatastoreBackend::S3(s3_client) = self.backend()? { + if let DatastoreBackend::S3(s3_backend) = self.backend()? { let object_key = crate::s3::object_key_from_path( &backup_dir.relative_path(), PROTECTED_MARKER_FILENAME, ) .context("invalid protected marker object key")?; let _is_duplicate = proxmox_async::runtime::block_on( - s3_client + s3_backend + .client .upload_no_replace_with_retry(object_key, hyper::body::Bytes::from("")), ) .context("failed to mark snapshot as protected on s3 backend")?; @@ -2876,14 +2894,14 @@ impl DataStore { bail!("could not remove protection file: {err}"); } } - if let DatastoreBackend::S3(s3_client) = self.backend()? { + if let DatastoreBackend::S3(s3_backend) = self.backend()? { let object_key = crate::s3::object_key_from_path( &backup_dir.relative_path(), PROTECTED_MARKER_FILENAME, ) .context("invalid protected marker object key")?; if let Err(err) = - proxmox_async::runtime::block_on(s3_client.delete_object(object_key)) + proxmox_async::runtime::block_on(s3_backend.client.delete_object(object_key)) { std::fs::File::create(&protected_path) .map_err(|err| format_err!("could not re-create protection file: {err}"))?; @@ -3076,13 +3094,14 @@ impl DataStore { } } - if let (_backend, Some(s3_client)) = - Self::s3_client_and_backend_from_datastore_config(&datastore_config)? + if let (_backend, Some(s3_backend)) = + Self::s3_backend_from_datastore_config(&datastore_config)? { // Delete all objects within the datastore prefix let prefix = S3PathPrefix::Some(String::default()); - let delete_objects_error = - proxmox_async::runtime::block_on(s3_client.delete_objects_by_prefix(&prefix))?; + let delete_objects_error = proxmox_async::runtime::block_on( + s3_backend.client.delete_objects_by_prefix(&prefix), + )?; if !delete_objects_error.is_empty() { crate::s3::log_s3_delete_objects_errors(&delete_objects_error); bail!("deleting objects failed"); @@ -3093,13 +3112,13 @@ impl DataStore { if ok { remove(".chunks", &mut ok); } - } else if let (_backend, Some(s3_client)) = - Self::s3_client_and_backend_from_datastore_config(&datastore_config)? + } else if let (_backend, Some(s3_backend)) = + Self::s3_backend_from_datastore_config(&datastore_config)? { // Only delete in-use marker so datastore can be re-imported let object_key = S3ObjectKey::try_from(S3_DATASTORE_IN_USE_MARKER) .context("failed to generate in-use marker object key")?; - proxmox_async::runtime::block_on(s3_client.delete_object(object_key)) + proxmox_async::runtime::block_on(s3_backend.client.delete_object(object_key)) .context("failed to delete in-use marker")?; } @@ -3161,12 +3180,12 @@ impl DataStore { pub async fn s3_refresh(self: &Arc) -> Result<(), Error> { match self.backend()? { DatastoreBackend::Filesystem => bail!("store '{}' not backed by S3", self.name()), - DatastoreBackend::S3(s3_client) => { + DatastoreBackend::S3(s3_backend) => { let tmp_base = proxmox_sys::fs::make_tmp_dir(self.base_path(), None) .context("failed to create temporary content folder in {store_base}")?; if let Err(err) = async { - self.fetch_tmp_contents(&tmp_base, &s3_client).await?; + self.fetch_tmp_contents(&tmp_base, &s3_backend).await?; self.move_tmp_contents_in_place(&tmp_base).await } .await @@ -3182,7 +3201,11 @@ impl DataStore { // Fetch the contents (metadata, no chunks) of the datastore from the S3 object store to the // provided temporaray directory - async fn fetch_tmp_contents(&self, tmp_base: &Path, s3_client: &S3Client) -> Result<(), Error> { + async fn fetch_tmp_contents( + &self, + tmp_base: &Path, + s3_backend: &S3Backend, + ) -> Result<(), Error> { let backup_user = pbs_config::backup_user().context("failed to get backup user")?; let mode = nix::sys::stat::Mode::from_bits_truncate(0o0644); let file_create_options = CreateOptions::new() @@ -3199,7 +3222,8 @@ impl DataStore { let store_prefix = format!("{}/{S3_CONTENT_PREFIX}/", self.name()); let mut next_continuation_token: Option = None; loop { - let list_objects_result = s3_client + let list_objects_result = s3_backend + .client .list_objects_v2(&list_prefix, next_continuation_token.as_deref()) .await .context("failed to list object")?; @@ -3241,7 +3265,8 @@ impl DataStore { .await .with_context(|| format!("failed to create target file {file_path:?}"))?; - if let Some(response) = s3_client + if let Some(response) = s3_backend + .client .get_object(object_key) .await .with_context(|| format!("failed to fetch object {object_path}"))? @@ -3313,9 +3338,9 @@ impl DataStore { Ok(()) } - pub fn s3_client_and_backend_from_datastore_config( + pub fn s3_backend_from_datastore_config( datastore_config: &DataStoreConfig, - ) -> Result<(DatastoreBackendType, Option), Error> { + ) -> Result<(DatastoreBackendType, Option), Error> { let backend_config = pbs_config::datastore::parse_backend_config(datastore_config)?; let backend_type = backend_config.ty.unwrap_or_default(); @@ -3359,7 +3384,8 @@ impl DataStore { let s3_client = S3Client::new(options) .context("failed to create s3 client") .map_err(|err| format_err!("{err:#}"))?; - Ok((backend_type, Some(s3_client))) + let s3_backend = S3Backend { client: s3_client }; + Ok((backend_type, Some(s3_backend))) } /// Creates or updates the notes associated with a backup group. @@ -3371,14 +3397,16 @@ impl DataStore { ) -> Result<(), Error> { let _lock = backup_group.lock().context("failed to lock backup group")?; - if let DatastoreBackend::S3(s3_client) = self.backend()? { + if let DatastoreBackend::S3(s3_backend) = self.backend()? { let mut path = backup_group.backup_ns().path(); path.push(backup_group.group().to_string()); let object_key = crate::s3::object_key_from_path(&path, "notes") .context("invalid owner file object key")?; let data = hyper::body::Bytes::copy_from_slice(notes.as_bytes()); let _is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_replace_with_retry(object_key, data), + s3_backend + .client + .upload_replace_with_retry(object_key, data), ) .context("failed to set notes on s3 backend")?; } @@ -3406,12 +3434,16 @@ impl DataStore { blob: DataBlob, backend: &DatastoreBackend, ) -> Result<(), Error> { - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let object_key = crate::s3::object_key_from_path(&snapshot.relative_path(), filename) .context("invalid blob object key")?; let data = hyper::body::Bytes::copy_from_slice(blob.raw_data()); - proxmox_async::runtime::block_on(s3_client.upload_replace_with_retry(object_key, data)) - .context("failed to upload blob to s3 backend")?; + proxmox_async::runtime::block_on( + s3_backend + .client + .upload_replace_with_retry(object_key, data), + ) + .context("failed to upload blob to s3 backend")?; }; let mut path = snapshot.full_path(); @@ -3467,7 +3499,7 @@ impl DataStore { ) })?; - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let suffix = format!(".{counter}.bad"); let target_key = crate::s3::object_key_from_digest_with_suffix(digest, &suffix) .map_err(|err| { @@ -3477,12 +3509,16 @@ impl DataStore { format_err!("could not generate object key for corrupt chunk {path:?} - {err}") })?; - proxmox_async::runtime::block_on(s3_client.copy_object(object_key.clone(), target_key)) - .map_err(|err| { - format_err!("failed to copy corrupt chunk on s3 backend: {digest_str} - {err}") - })?; + proxmox_async::runtime::block_on( + s3_backend + .client + .copy_object(object_key.clone(), target_key), + ) + .map_err(|err| { + format_err!("failed to copy corrupt chunk on s3 backend: {digest_str} - {err}") + })?; - proxmox_async::runtime::block_on(s3_client.delete_object(object_key)).map_err( + proxmox_async::runtime::block_on(s3_backend.client.delete_object(object_key)).map_err( |err| { format_err!( "failed to delete corrupt chunk on s3 backend: {digest_str} - {err}" @@ -3524,14 +3560,14 @@ impl S3DeleteList { self.list.push((key, guard)); } - /// Delete the objects in the list via the provided S3 client instance. + /// Delete the objects in the list via the provided S3 backend instance. /// Clears the list contents and frees the per-chunk file locks. - fn delete_and_drop_locks(&mut self, s3_client: &Arc) -> Result<(), Error> { + fn delete_and_drop_locks(&mut self, s3_backend: &Arc) -> Result<(), Error> { if self.list.is_empty() { return Ok(()); } let delete_objects_result = proxmox_async::runtime::block_on( - s3_client.delete_objects( + s3_backend.client.delete_objects( &self .list .iter() @@ -3551,7 +3587,7 @@ impl S3DeleteList { /// threshold or the delete list age threshold. fn conditional_delete_and_drop_locks( &mut self, - s3_client: &Arc, + s3_backend: &Arc, ) -> Result<(), Error> { if self.list.len() >= self.capacity_threshold || (!self.list.is_empty() @@ -3561,7 +3597,7 @@ impl S3DeleteList { .unwrap_or(self.age_threshold) // time jump to past, chunk markers already gone >= self.age_threshold) { - self.delete_and_drop_locks(s3_client)?; + self.delete_and_drop_locks(s3_backend)?; } Ok(()) } diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index d9d7a4cb6..03408d4e9 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -218,7 +218,7 @@ pub use store_progress::StoreProgress; mod datastore; pub use datastore::{ DataStore, DataStoreLookup, DatastoreBackend, S3_CLIENT_REQUEST_COUNTER_BASE_PATH, - S3_DATASTORE_IN_USE_MARKER, check_backup_owner, check_namespace_depth_limit, + S3_DATASTORE_IN_USE_MARKER, S3Backend, check_backup_owner, check_namespace_depth_limit, ensure_datastore_is_mounted, get_datastore_mount_status, }; diff --git a/pbs-datastore/src/local_chunk_reader.rs b/pbs-datastore/src/local_chunk_reader.rs index faae43b71..ea1e88a59 100644 --- a/pbs-datastore/src/local_chunk_reader.rs +++ b/pbs-datastore/src/local_chunk_reader.rs @@ -7,11 +7,10 @@ use http_body_util::BodyExt; use pbs_api_types::CryptMode; use pbs_tools::crypt_config::CryptConfig; -use proxmox_s3_client::S3Client; use crate::DataStore; use crate::data_blob::DataBlob; -use crate::datastore::DatastoreBackend; +use crate::datastore::{DatastoreBackend, S3Backend}; use crate::read_chunk::{AsyncReadChunk, ReadChunk}; #[derive(Clone)] @@ -53,9 +52,9 @@ impl LocalChunkReader { } } -async fn fetch(s3_client: Arc, digest: &[u8; 32]) -> Result { +async fn fetch(s3_backend: Arc, digest: &[u8; 32]) -> Result { let object_key = crate::s3::object_key_from_digest(digest)?; - if let Some(response) = s3_client.get_object(object_key).await? { + if let Some(response) = s3_backend.client.get_object(object_key).await? { let bytes = response.content.collect().await?.to_bytes(); DataBlob::from_raw(bytes.to_vec()) } else { @@ -67,10 +66,10 @@ impl ReadChunk for LocalChunkReader { fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result { let chunk = match &self.backend { DatastoreBackend::Filesystem => self.store.load_chunk(digest)?, - DatastoreBackend::S3(s3_client) => match self.store.cache() { - None => proxmox_async::runtime::block_on(fetch(Arc::clone(s3_client), digest))?, + DatastoreBackend::S3(s3_backend) => match self.store.cache() { + None => proxmox_async::runtime::block_on(fetch(Arc::clone(s3_backend), digest))?, Some(cache) => { - proxmox_async::runtime::block_on(cache.access(digest, s3_client.clone()))? + proxmox_async::runtime::block_on(cache.access(digest, s3_backend.clone()))? .ok_or(format_err!( "unable to access chunk with digest {}", hex::encode(digest) @@ -104,11 +103,11 @@ impl AsyncReadChunk for LocalChunkReader { let raw_data = tokio::fs::read(&path).await?; DataBlob::load_from_reader(&mut &raw_data[..])? } - DatastoreBackend::S3(s3_client) => match self.store.cache() { - None => fetch(Arc::clone(s3_client), digest).await?, + DatastoreBackend::S3(s3_backend) => match self.store.cache() { + None => fetch(Arc::clone(s3_backend), digest).await?, Some(cache) => { cache - .access(digest, s3_client.clone()) + .access(digest, s3_backend.clone()) .await? .ok_or(format_err!( "unable to access chunk with digest {}", diff --git a/pbs-datastore/src/local_datastore_lru_cache.rs b/pbs-datastore/src/local_datastore_lru_cache.rs index 561d36d79..2f23d2d71 100644 --- a/pbs-datastore/src/local_datastore_lru_cache.rs +++ b/pbs-datastore/src/local_datastore_lru_cache.rs @@ -7,9 +7,8 @@ use anyhow::{Error, bail}; use http_body_util::BodyExt; use pbs_tools::async_lru_cache::AsyncLruCache; -use proxmox_s3_client::S3Client; -use crate::{ChunkStore, DataBlob}; +use crate::{ChunkStore, DataBlob, S3Backend}; /// LRU cache using local datastore for caching chunks /// @@ -71,7 +70,7 @@ impl LocalDatastoreLruCache { pub async fn access( &self, digest: &[u8; 32], - client: Arc, + s3_backend: Arc, ) -> Result, Error> { let (path, _digest_str) = self.store.chunk_path(digest); match std::fs::File::open(&path) { @@ -93,7 +92,7 @@ impl LocalDatastoreLruCache { use std::io::Seek; // Check if file is empty marker file, try fetching content if so if file.seek(std::io::SeekFrom::End(0))? == 0 { - let chunk = self.fetch_and_insert(client.clone(), digest).await?; + let chunk = self.fetch_and_insert(s3_backend.clone(), digest).await?; Ok(Some(chunk)) } else { Err(err) @@ -103,7 +102,7 @@ impl LocalDatastoreLruCache { Err(err) => { // Failed to open file, missing if err.kind() == std::io::ErrorKind::NotFound { - let chunk = self.fetch_and_insert(client.clone(), digest).await?; + let chunk = self.fetch_and_insert(s3_backend.clone(), digest).await?; Ok(Some(chunk)) } else { Err(Error::from(err)) @@ -119,11 +118,11 @@ impl LocalDatastoreLruCache { async fn fetch_and_insert( &self, - client: Arc, + s3_backend: Arc, digest: &[u8; 32], ) -> Result { let object_key = crate::s3::object_key_from_digest(digest)?; - match client.get_object(object_key).await? { + match s3_backend.client.get_object(object_key).await? { None => { bail!("could not fetch object with key {}", hex::encode(digest)) } diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs index 7c65470f9..ee4812d64 100644 --- a/src/api2/config/datastore.rs +++ b/src/api2/config/datastore.rs @@ -122,34 +122,34 @@ pub(crate) fn do_create_datastore( } let tuning = pbs_config::datastore::parse_datastore_tuning_options(&datastore)?; - let (backend_type, backend_s3_client) = - match DataStore::s3_client_and_backend_from_datastore_config(&datastore)? { - (backend_type, Some(s3_client)) => { - if !overwrite_in_use { - let object_key = S3ObjectKey::try_from(S3_DATASTORE_IN_USE_MARKER) - .context("failed to generate s3 object key")?; - if let Some(response) = - proxmox_async::runtime::block_on(s3_client.get_object(object_key.clone())) - .context("failed to get in-use marker from bucket")? - { - let content = proxmox_async::runtime::block_on(response.content.collect()) - .unwrap_or_default(); - let content = - String::from_utf8(content.to_bytes().to_vec()).unwrap_or_default(); - let in_use: InUseContent = - serde_json::from_str(&content).unwrap_or_default(); - if let Some(hostname) = in_use.hostname { - bail!("Bucket already contains datastore in use by host {hostname}"); - } else { - bail!("Bucket already contains datastore in use"); - } + let (backend_type, s3_backend) = match DataStore::s3_backend_from_datastore_config(&datastore)? + { + (backend_type, Some(s3_backend)) => { + if !overwrite_in_use { + let object_key = S3ObjectKey::try_from(S3_DATASTORE_IN_USE_MARKER) + .context("failed to generate s3 object key")?; + if let Some(response) = proxmox_async::runtime::block_on( + s3_backend.client.get_object(object_key.clone()), + ) + .context("failed to get in-use marker from bucket")? + { + let content = proxmox_async::runtime::block_on(response.content.collect()) + .unwrap_or_default(); + let content = + String::from_utf8(content.to_bytes().to_vec()).unwrap_or_default(); + let in_use: InUseContent = serde_json::from_str(&content).unwrap_or_default(); + if let Some(hostname) = in_use.hostname { + bail!("Bucket already contains datastore in use by host {hostname}"); + } else { + bail!("Bucket already contains datastore in use"); } } - - (backend_type, Some(Arc::new(s3_client))) } - (backend_type, None) => (backend_type, None), - }; + + (backend_type, Some(Arc::new(s3_backend))) + } + (backend_type, None) => (backend_type, None), + }; let unmount_guard = if datastore.backing_device.is_some() { do_mount_device(datastore.clone())?; @@ -206,14 +206,14 @@ pub(crate) fn do_create_datastore( )? }; - if let Some(ref s3_client) = backend_s3_client { + if let Some(s3_backend) = &s3_backend { let object_key = S3ObjectKey::try_from(S3_DATASTORE_IN_USE_MARKER) .context("failed to generate s3 object key")?; let content = serde_json::to_string(&InUseContent { hostname: Some(proxmox_sys::nodename().to_string()), }) .context("failed to encode hostname")?; - proxmox_async::runtime::block_on(s3_client.put_object( + proxmox_async::runtime::block_on(s3_backend.client.put_object( object_key, hyper::body::Bytes::from(content).into(), Some(S3_HTTP_REQUEST_TIMEOUT), @@ -224,7 +224,7 @@ pub(crate) fn do_create_datastore( if tuning.gc_atime_safety_check.unwrap_or(true) { chunk_store - .check_fs_atime_updates(true, backend_s3_client) + .check_fs_atime_updates(true, s3_backend) .context("access time safety check failed")?; info!("Access time update check successful."); } else { @@ -340,9 +340,9 @@ pub fn create_datastore( let store_name = config.name.to_string(); - let (backend, s3_client) = DataStore::s3_client_and_backend_from_datastore_config(&config)?; - if let Some(s3_client) = s3_client { - proxmox_async::runtime::block_on(s3_client.head_bucket()) + let (backend, s3_backend) = DataStore::s3_backend_from_datastore_config(&config)?; + if let Some(s3_backend) = s3_backend { + proxmox_async::runtime::block_on(s3_backend.client.head_bucket()) .context("failed to access bucket") .map_err(|err| format_err!("{err:#}"))?; } diff --git a/src/api2/reader/mod.rs b/src/api2/reader/mod.rs index 4321d2286..051536ff9 100644 --- a/src/api2/reader/mod.rs +++ b/src/api2/reader/mod.rs @@ -28,9 +28,8 @@ use pbs_api_types::{ }; use pbs_config::CachedUserInfo; use pbs_datastore::index::IndexFile; -use pbs_datastore::{DataStore, DatastoreBackend, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1}; +use pbs_datastore::{DataStore, DatastoreBackend, PROXMOX_BACKUP_READER_PROTOCOL_ID_V1, S3Backend}; use pbs_tools::json::required_string_param; -use proxmox_s3_client::S3Client; use crate::api2::ExecInheritLogContext; use crate::api2::backup::optional_ns_param; @@ -323,12 +322,12 @@ fn download_chunk( let body = match &env.backend { DatastoreBackend::Filesystem => load_from_filesystem(env, &digest)?, - DatastoreBackend::S3(s3_client) => match env.datastore.cache() { - None => fetch_from_object_store(s3_client, &digest).await?, + DatastoreBackend::S3(s3_backend) => match env.datastore.cache() { + None => fetch_from_object_store(s3_backend, &digest).await?, Some(cache) => { let chunk = cache - .access(&digest, s3_client.clone()) + .access(&digest, s3_backend.clone()) .await? .ok_or(format_err!( "unable to access chunk with digest {}", @@ -349,9 +348,9 @@ fn download_chunk( .boxed() } -async fn fetch_from_object_store(s3_client: &S3Client, digest: &[u8; 32]) -> Result { +async fn fetch_from_object_store(s3_backend: &S3Backend, digest: &[u8; 32]) -> Result { let object_key = pbs_datastore::s3::object_key_from_digest(digest)?; - if let Some(response) = s3_client.get_object(object_key).await? { + if let Some(response) = s3_backend.client.get_object(object_key).await? { let data = response.content.collect().await?.to_bytes(); return Ok(Body::from(data)); } diff --git a/src/backup/verify.rs b/src/backup/verify.rs index 878a27f46..710d84b46 100644 --- a/src/backup/verify.rs +++ b/src/backup/verify.rs @@ -268,9 +268,9 @@ impl VerifyWorker { verify_state.decoded_bytes.fetch_add(size, Ordering::SeqCst); } }, - DatastoreBackend::S3(s3_client) => { + DatastoreBackend::S3(s3_backend) => { let object_key = pbs_datastore::s3::object_key_from_digest(&info.digest)?; - match proxmox_async::runtime::block_on(s3_client.get_object(object_key)) { + match proxmox_async::runtime::block_on(s3_backend.client.get_object(object_key)) { Ok(Some(response)) => { match proxmox_async::runtime::block_on(response.content.collect()) { Ok(raw_chunk) => { diff --git a/src/server/pull.rs b/src/server/pull.rs index e86a9c329..07b636522 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -686,7 +686,7 @@ async fn pull_snapshot<'a>( .with_context(|| prefix.clone())?; if client_log_name.exists() { - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let object_key = pbs_datastore::s3::object_key_from_path( &snapshot.relative_path(), CLIENT_LOG_BLOB_NAME.as_ref(), @@ -699,7 +699,8 @@ async fn pull_snapshot<'a>( .context("failed to read log file contents") .with_context(|| prefix.clone())?; let contents = hyper::body::Bytes::from(data); - let _is_duplicate = s3_client + let _is_duplicate = s3_backend + .client .upload_replace_with_retry(object_key, contents) .await .context("failed to upload client log to s3 backend") @@ -903,7 +904,7 @@ async fn pull_snapshot<'a>( if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}"); } - if let DatastoreBackend::S3(s3_client) = backend { + if let DatastoreBackend::S3(s3_backend) = backend { let object_key = pbs_datastore::s3::object_key_from_path( &snapshot.relative_path(), MANIFEST_BLOB_NAME.as_ref(), @@ -911,7 +912,8 @@ async fn pull_snapshot<'a>( .context("invalid manifest object key")?; let data = hyper::body::Bytes::from(manifest_data); - let _is_duplicate = s3_client + let _is_duplicate = s3_backend + .client .upload_replace_with_retry(object_key, data) .await .context("failed to upload manifest to s3 backend") -- 2.47.3