all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: "Proxmox Backup Server development discussion"
	<pbs-devel@lists.proxmox.com>,
	"Fabian Grünbichler" <f.gruenbichler@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup 5/7] api: chunk upload: fix race between chunk backend upload and insert
Date: Tue, 7 Oct 2025 12:15:15 +0200	[thread overview]
Message-ID: <78cbd2ab-4c29-4c4d-8c7e-7a7060025611@proxmox.com> (raw)
In-Reply-To: <1759752528.phfr2pxqbx.astroid@yuna.none>

On 10/6/25 3:17 PM, Fabian Grünbichler wrote:
> On October 6, 2025 12:41 pm, Christian Ebner wrote:
>> Chunk are first uploaded to the object store for S3 backed
>> datastores, only then inserted into the local datastore cache.
>> By this, it is assured that the chunk is only ever considered as
>> valid after successful upload. Garbage collection does however rely
>> on the local marker file to be present, only then considering the chunk
>> as in-use. While this marker is created if not present during phase 1
>> of garbage collection, this only happens for chunks which are already
>> referenced by a complete index file.
>>
>> Therefore, there remains a race window between garbage collection
>> listing the chunks (upload completed) and lookup of the local marker
>> file being present (chunk cache insert after upload). This can lead to
>> chunks which just finished upload, but were not yet inserted into the
>> local cache store to be removed again.
>>
>> To close this race window, mark chunks which are currently being
>> uploaded to the backend by an additional marker file, checked by the
>> garbage collection as well if the regular marker is not found, and
>> only removed after successful chunk insert.
>>
>> Avoid this overhead for regular datastores by passing and checking
>> the backend type for the chunk store.
>>
>> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
>> ---
>>   pbs-datastore/src/chunk_store.rs | 94 +++++++++++++++++++++++++++++---
>>   pbs-datastore/src/datastore.rs   | 32 +++++++++++
>>   src/api2/backup/upload_chunk.rs  | 17 ++++--
>>   src/api2/config/datastore.rs     |  2 +
>>   4 files changed, 133 insertions(+), 12 deletions(-)
>>
>> diff --git a/pbs-datastore/src/chunk_store.rs b/pbs-datastore/src/chunk_store.rs
>> index 74fa79db1..22efe4a32 100644
>> --- a/pbs-datastore/src/chunk_store.rs
>> +++ b/pbs-datastore/src/chunk_store.rs
>> @@ -7,7 +7,7 @@ use std::time::Duration;
>>   use anyhow::{bail, format_err, Context, Error};
>>   use tracing::{info, warn};
>>   
>> -use pbs_api_types::{DatastoreFSyncLevel, GarbageCollectionStatus};
>> +use pbs_api_types::{DatastoreBackendType, DatastoreFSyncLevel, GarbageCollectionStatus};
>>   use proxmox_io::ReadExt;
>>   use proxmox_s3_client::S3Client;
>>   use proxmox_sys::fs::{create_dir, create_path, file_type_from_file_stat, CreateOptions};
>> @@ -22,6 +22,10 @@ use crate::file_formats::{
>>   };
>>   use crate::DataBlob;
>>   
>> +/// Filename extension for local datastore cache marker files indicating
>> +/// the chunk being uploaded to the backend
>> +pub(crate) const BACKEND_UPLOAD_MARKER_EXT: &str = "backend-upload";
> 
> I think this part here should be internal to the chunk store
> 
>> +
>>   /// File system based chunk store
>>   pub struct ChunkStore {
>>       name: String, // used for error reporting
>> @@ -30,6 +34,7 @@ pub struct ChunkStore {
>>       mutex: Mutex<()>,
>>       locker: Option<Arc<Mutex<ProcessLocker>>>,
>>       sync_level: DatastoreFSyncLevel,
>> +    datastore_backend_type: DatastoreBackendType,
>>   }
>>   
>>   // TODO: what about sysctl setting vm.vfs_cache_pressure (0 - 100) ?
>> @@ -77,6 +82,7 @@ impl ChunkStore {
>>               mutex: Mutex::new(()),
>>               locker: None,
>>               sync_level: Default::default(),
>> +            datastore_backend_type: DatastoreBackendType::default(),
>>           }
>>       }
>>   
>> @@ -97,6 +103,7 @@ impl ChunkStore {
>>           uid: nix::unistd::Uid,
>>           gid: nix::unistd::Gid,
>>           sync_level: DatastoreFSyncLevel,
>> +        datastore_backend_type: DatastoreBackendType,
>>       ) -> Result<Self, Error>
>>       where
>>           P: Into<PathBuf>,
>> @@ -151,7 +158,7 @@ impl ChunkStore {
>>               }
>>           }
>>   
>> -        Self::open(name, base, sync_level)
>> +        Self::open(name, base, sync_level, datastore_backend_type)
>>       }
>>   
>>       fn lockfile_path<P: Into<PathBuf>>(base: P) -> PathBuf {
>> @@ -185,6 +192,7 @@ impl ChunkStore {
>>           name: &str,
>>           base: P,
>>           sync_level: DatastoreFSyncLevel,
>> +        datastore_backend_type: DatastoreBackendType,
>>       ) -> Result<Self, Error> {
>>           let base: PathBuf = base.into();
>>   
>> @@ -201,6 +209,7 @@ impl ChunkStore {
>>               locker: Some(locker),
>>               mutex: Mutex::new(()),
>>               sync_level,
>> +            datastore_backend_type,
>>           })
>>       }
>>   
>> @@ -567,6 +576,40 @@ impl ChunkStore {
>>           Ok(())
>>       }
>>   
>> +    pub(crate) fn insert_backend_upload_marker(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> +        if self.datastore_backend_type == DatastoreBackendType::Filesystem {
>> +            bail!("cannot create backend upload marker, not a cache store");
>> +        }
>> +        let (marker_path, digest_str) = self.chunk_backed_upload_marker_path(digest);
>> +
>> +        let _lock = self.mutex.lock();
>> +
>> +        std::fs::File::options()
>> +            .write(true)
>> +            .create_new(true)
>> +            .open(&marker_path)
>> +            .with_context(|| {
>> +                format!("failed to create backend upload marker for chunk {digest_str}")
> 
> A: reference for comment further below in upload_chunk
> 
>> +            })?;
>> +        Ok(())
>> +    }
>> +
>> +    pub(crate) fn cleanup_backend_upload_marker(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> +        if self.datastore_backend_type == DatastoreBackendType::Filesystem {
>> +            bail!("cannot cleanup backend upload marker, not a cache store");
>> +        }
>> +        let (marker_path, _digest_str) = self.chunk_backed_upload_marker_path(digest);
>> +
>> +        let _lock = self.mutex.lock();
>> +
>> +        if let Err(err) = std::fs::remove_file(marker_path) {
>> +            if err.kind() != std::io::ErrorKind::NotFound {
>> +                bail!("failed to cleanup backend upload marker: {err}");
>> +            }
>> +        }
>> +        Ok(())
>> +    }
>> +
> 
> and instead we should add a third helper here that allows to query
> whether there exists a pending upload marker for a certain digest?
> 
>>       pub fn insert_chunk(&self, chunk: &DataBlob, digest: &[u8; 32]) -> Result<(bool, u64), Error> {
>>           // unwrap: only `None` in unit tests
>>           assert!(self.locker.is_some());
>> @@ -644,6 +687,16 @@ impl ChunkStore {
>>               format_err!("inserting chunk on store '{name}' failed for {digest_str} - {err}")
>>           })?;
>>   
>> +        if self.datastore_backend_type != DatastoreBackendType::Filesystem {
>> +            if let Err(err) =
>> +                std::fs::remove_file(chunk_path.with_extension(BACKEND_UPLOAD_MARKER_EXT))
>> +            {
>> +                if err.kind() != std::io::ErrorKind::NotFound {
>> +                    bail!("failed to cleanup backend upload marker: {err}");
>> +                }
>> +            }
>> +        }
>> +
>>           if self.sync_level == DatastoreFSyncLevel::File {
>>               // fsync dir handle to persist the tmp rename
>>               let dir = std::fs::File::open(chunk_dir_path)?;
>> @@ -688,6 +741,15 @@ impl ChunkStore {
>>           Ok(())
>>       }
>>   
>> +    /// Generate file path for backend upload marker of given chunk digest.
>> +    pub(crate) fn chunk_backed_upload_marker_path(&self, digest: &[u8; 32]) -> (PathBuf, String) {
>> +        let (chunk_path, digest_str) = self.chunk_path(digest);
>> +        (
>> +            chunk_path.with_extension(BACKEND_UPLOAD_MARKER_EXT),
>> +            digest_str,
>> +        )
>> +    }
> 
> this should be internal as well
> 
>> +
>>       pub fn relative_path(&self, path: &Path) -> PathBuf {
>>           // unwrap: only `None` in unit tests
>>           assert!(self.locker.is_some());
>> @@ -773,14 +835,26 @@ fn test_chunk_store1() {
>>   
>>       if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
>>   
>> -    let chunk_store = ChunkStore::open("test", &path, DatastoreFSyncLevel::None);
>> +    let chunk_store = ChunkStore::open(
>> +        "test",
>> +        &path,
>> +        DatastoreFSyncLevel::None,
>> +        DatastoreBackendType::Filesystem,
>> +    );
>>       assert!(chunk_store.is_err());
>>   
>>       let user = nix::unistd::User::from_uid(nix::unistd::Uid::current())
>>           .unwrap()
>>           .unwrap();
>> -    let chunk_store =
>> -        ChunkStore::create("test", &path, user.uid, user.gid, DatastoreFSyncLevel::None).unwrap();
>> +    let chunk_store = ChunkStore::create(
>> +        "test",
>> +        &path,
>> +        user.uid,
>> +        user.gid,
>> +        DatastoreFSyncLevel::None,
>> +        DatastoreBackendType::Filesystem,
>> +    )
>> +    .unwrap();
>>   
>>       let (chunk, digest) = crate::data_blob::DataChunkBuilder::new(&[0u8, 1u8])
>>           .build()
>> @@ -792,8 +866,14 @@ fn test_chunk_store1() {
>>       let (exists, _) = chunk_store.insert_chunk(&chunk, &digest).unwrap();
>>       assert!(exists);
>>   
>> -    let chunk_store =
>> -        ChunkStore::create("test", &path, user.uid, user.gid, DatastoreFSyncLevel::None);
>> +    let chunk_store = ChunkStore::create(
>> +        "test",
>> +        &path,
>> +        user.uid,
>> +        user.gid,
>> +        DatastoreFSyncLevel::None,
>> +        DatastoreBackendType::Filesystem,
>> +    );
>>       assert!(chunk_store.is_err());
>>   
>>       if let Err(_e) = std::fs::remove_dir_all(".testdir") { /* ignore */ }
>> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
>> index 4f55eb9db..58fb863ec 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -348,10 +348,14 @@ impl DataStore {
>>                   DatastoreTuning::API_SCHEMA
>>                       .parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
>>               )?;
>> +            let backend_config: DatastoreBackendConfig =
>> +                config.backend.as_deref().unwrap_or("").parse()?;
>> +            let backend_type = backend_config.ty.unwrap_or_default();
>>               Arc::new(ChunkStore::open(
>>                   name,
>>                   config.absolute_path(),
>>                   tuning.sync_level.unwrap_or_default(),
>> +                backend_type,
>>               )?)
>>           };
>>   
>> @@ -436,10 +440,16 @@ impl DataStore {
>>               DatastoreTuning::API_SCHEMA
>>                   .parse_property_string(config.tuning.as_deref().unwrap_or(""))?,
>>           )?;
>> +        let backend_config: DatastoreBackendConfig = serde_json::from_value(
>> +            DatastoreBackendConfig::API_SCHEMA
>> +                .parse_property_string(config.backend.as_deref().unwrap_or(""))?,
>> +        )?;
>> +        let backend_type = backend_config.ty.unwrap_or_default();
>>           let chunk_store = ChunkStore::open(
>>               &name,
>>               config.absolute_path(),
>>               tuning.sync_level.unwrap_or_default(),
>> +            backend_type,
>>           )?;
>>           let inner = Arc::new(Self::with_store_and_config(
>>               Arc::new(chunk_store),
>> @@ -1663,6 +1673,15 @@ impl DataStore {
>>                       let atime = match std::fs::metadata(&chunk_path) {
>>                           Ok(stat) => stat.accessed()?,
>>                           Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
>> +                            if std::fs::metadata(
>> +                                chunk_path
>> +                                    .with_extension(crate::chunk_store::BACKEND_UPLOAD_MARKER_EXT),
>> +                            )
>> +                            .is_ok()
>> +                            {
>> +                                info!("keep in progress {}", content.key);
> 
> what does `keep in progress` mean?
> 
>> +                                continue;
>> +                            }
> 
> this should call the new helper.
> 
>>                               // File not found, delete by setting atime to unix epoch
>>                               info!("Not found, mark for deletion: {}", content.key);
>>                               SystemTime::UNIX_EPOCH
>> @@ -1867,6 +1886,19 @@ impl DataStore {
>>           self.inner.chunk_store.insert_chunk(chunk, digest)
>>       }
>>   
>> +    /// Inserts the marker file to signal an in progress upload to the backend
>> +    ///
>> +    /// The presence of the marker avoids a race between inserting the chunk into the
>> +    /// datastore and cleanup of the chunk by garbage collection.
>> +    pub fn insert_backend_upload_marker(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> +        self.inner.chunk_store.insert_backend_upload_marker(digest)
>> +    }
>> +
>> +    /// Remove the marker file signaling an in-progress upload to the backend
>> +    pub fn cleanup_backend_upload_marker(&self, digest: &[u8; 32]) -> Result<(), Error> {
>> +        self.inner.chunk_store.cleanup_backend_upload_marker(digest)
>> +    }
>> +
>>       pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
>>           let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
>>           std::fs::metadata(chunk_path).map_err(Error::from)
>> diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
>> index 8dd7e4d52..d4b1850eb 100644
>> --- a/src/api2/backup/upload_chunk.rs
>> +++ b/src/api2/backup/upload_chunk.rs
>> @@ -259,6 +259,7 @@ async fn upload_to_backend(
>>                       data.len()
>>                   );
>>               }
>> +            let datastore = env.datastore.clone();
>>   
>>               if env.no_cache {
>>                   let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
>> @@ -272,11 +273,11 @@ async fn upload_to_backend(
>>               // Avoid re-upload to S3 if the chunk is either present in the LRU cache or the chunk
>>               // file exists on filesystem. The latter means that the chunk has been present in the
>>               // past an was not cleaned up by garbage collection, so contained in the S3 object store.
>> -            if env.datastore.cache_contains(&digest) {
>> +            if datastore.cache_contains(&digest) {
>>                   tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
>>                   return Ok((digest, size, encoded_size, true));
>>               }
>> -            if let Ok(true) = env.datastore.cond_touch_chunk(&digest, false) {
>> +            if let Ok(true) = datastore.cond_touch_chunk(&digest, false) {
>>                   tracing::info!(
>>                       "Skip upload of already encountered chunk {}",
>>                       hex::encode(digest)
>> @@ -284,18 +285,24 @@ async fn upload_to_backend(
>>                   return Ok((digest, size, encoded_size, true));
>>               }
>>   
>> +            datastore.insert_backend_upload_marker(&digest)?;
> 
> what stops two concurrent uploads from ending up here and making this
> return an error for the second client to end up attempting to insert the
> marker file? in A, we only take the mutex for inserting the marker file
> itself..

This issue here is actually rather problematic... One could backoff and 
busy wait? Or maybe use an inotify watch on the file to get informed 
when the concurrent upload is finished/failed and cleaned up the upload 
marker. Only issue with that is for cases where there is a lingering 
upload marker which will never get cleaned up e.g. because of a crash...

The better option is probably to rather not fail on duplicate marker 
creation, allowing concurrent uploads to the s3 backend and sync up on 
insert, not failing on marker removal if the chunk file already exists 
because it has been inserted by the concurrent upload. Must do some more 
research on how the s3 backend handles such concurrent object upload cases.

> 
> there's a second issue - what if we end up calling
> datastore.insert_backend_upload_marker here, it blocks on the mutex,
> because another insertion is going on that already holds it..
> 
> then we end up creating the marker file here, even though the chunk is
> already in the chunk store..

True, but this is not as problematic as one can re-checking if the chunk 
file exists after acquiring the lock before writing the marker and short 
circuit there, skipping the re-upload and re-insert.

> 
> mutexes are not (guaranteed to be) fair, so we can have the situation
> that two backup clients concurrently attempt to upload the same chunk,
> with one of them succeeding across the whole
> 
> - insert marker
> - upload to S3
> - insert chunk and remove marker
> 
> process, with the other one blocked on obtaining the mutex for inserting
> the marker file if there is enough contention on the chunk store lock
> 
>>               tracing::info!("Upload of new chunk {}", hex::encode(digest));
>>               let object_key = pbs_datastore::s3::object_key_from_digest(&digest)?;
> 
> (side not - if this fails, then the marker file is left around..)

Moved the s3 key creation to be before the marker insertion, just like 
for the no-cache case, thanks.

>> -            let is_duplicate = s3_client
>> +            let is_duplicate = match s3_client
>>                   .upload_no_replace_with_retry(object_key, data.clone())
>>                   .await
>> -                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
>> +            {
>> +                Ok(is_duplicate) => is_duplicate,
>> +                Err(err) => {
>> +                    datastore.cleanup_backend_upload_marker(&digest)?;
>> +                    bail!("failed to upload chunk to s3 backend - {err:#}");
>> +                }
>> +            };
>>   
>>               // Only insert the chunk into the cache after it has been successufuly uploaded.
>>               // Although less performant than doing this in parallel, it is required for consisency
>>               // since chunks are considered as present on the backend if the file exists in the local
>>               // cache store.
>> -            let datastore = env.datastore.clone();
>>               tracing::info!("Caching of chunk {}", hex::encode(digest));
>>               let _ = tokio::task::spawn_blocking(move || {
>>                   let chunk = DataBlob::from_raw(data.to_vec())?;
> 
> and down below here, if the chunk was inserted by somebody else, the
> cache_insert call on the next line here will end up calling insert_chunk
> on the chunk_store, which won't clear the marker unless it actually
> freshly inserts the chunk, which doesn't happen if it already exists..
> 
>> diff --git a/src/api2/config/datastore.rs b/src/api2/config/datastore.rs
>> index 3b03c0466..541bd0a04 100644
>> --- a/src/api2/config/datastore.rs
>> +++ b/src/api2/config/datastore.rs
>> @@ -173,6 +173,7 @@ pub(crate) fn do_create_datastore(
>>                   &datastore.name,
>>                   &path,
>>                   tuning.sync_level.unwrap_or_default(),
>> +                backend_type,
>>               )
>>           })?
>>       } else {
>> @@ -204,6 +205,7 @@ pub(crate) fn do_create_datastore(
>>               backup_user.uid,
>>               backup_user.gid,
>>               tuning.sync_level.unwrap_or_default(),
>> +            backend_type,
>>           )?
>>       };
>>   
>> -- 
>> 2.47.3
>>
>>
>>
>> _______________________________________________
>> pbs-devel mailing list
>> pbs-devel@lists.proxmox.com
>> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
>>
>>
>>
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel

  reply	other threads:[~2025-10-07 10:15 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-10-06 10:41 [pbs-devel] [PATCH proxmox-backup 0/7] s3 store: fix issues with chunk s3 backend upload and cache eviction Christian Ebner
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 1/7] datastore: gc: inline single callsite method Christian Ebner
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 2/7] gc: chunk store: rework atime check and gc status into common helper Christian Ebner
2025-10-06 13:14   ` Fabian Grünbichler
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 3/7] chunk store: add and use method to remove chunks Christian Ebner
2025-10-06 13:17   ` Fabian Grünbichler
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 4/7] chunk store: fix: replace evicted cache chunks instead of truncate Christian Ebner
2025-10-06 13:18   ` Fabian Grünbichler
2025-10-06 15:35     ` Christian Ebner
2025-10-06 16:14       ` Christian Ebner
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 5/7] api: chunk upload: fix race between chunk backend upload and insert Christian Ebner
2025-10-06 13:18   ` Fabian Grünbichler
2025-10-07 10:15     ` Christian Ebner [this message]
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 6/7] api: chunk upload: fix race with garbage collection for no-cache on s3 Christian Ebner
2025-10-06 13:18   ` Fabian Grünbichler
2025-10-06 10:41 ` [pbs-devel] [PATCH proxmox-backup 7/7] pull: guard chunk upload and only insert into cache after upload Christian Ebner
2025-10-06 13:18   ` Fabian Grünbichler
2025-10-06 13:18 ` [pbs-devel] [PATCH proxmox-backup 0/7] s3 store: fix issues with chunk s3 backend upload and cache eviction Fabian Grünbichler
2025-10-08 15:22 ` [pbs-devel] superseded: " Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=78cbd2ab-4c29-4c4d-8c7e-7a7060025611@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal