public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Robert Obkircher <r.obkircher@proxmox.com>
To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com
Subject: Re: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers
Date: Wed, 27 May 2026 14:40:59 +0200	[thread overview]
Message-ID: <78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com> (raw)
In-Reply-To: <ad024ac0-5986-44cc-80a5-6a59602a035e@proxmox.com>


On 27.05.26 11:40, Christian Ebner wrote:
> Thanks a lot for debugging and tacking this so quickly!
>
> As already noted by yourself and discussed off-list, there are
> additional places interacting with the chunk store and cache which
> would require wrapping by a spawn_blocking() or implemented as async.
>
> One major concern here is the regression potential for both S3 and
> regular datastores, especially with respect to performance. So it
> would be great to have some before/after benchmarks as well.
>
> Give the scope, we might consider the alternative implementation
> using block_in_place() as bugfix for the time being, the code paths
> for regular datastores being battle proved for years now. 
I've realized that the alternative implementation can also use
spawn_blocking and sent a new patch:

https://lore.proxmox.com/pbs-devel/20260527123809.238964-1-r.obkircher@proxmox.com/T/#u

>
> On 5/26/26 7:21 PM, Robert Obkircher wrote:
>> Avoid async-sync-async transitions and wrap synchronous operations in
>> spawn_blocking.
>>
>> Previously, every s3 upload would go through proxmox_async::block_on
>> which has a waker implementation that relies on std::thread::park and
>> can thus completely block an entire tokio worker. In the backtrace
>> from the linked forum thread this was the case for 19 threads, while
>> the remaining two were presumably trying to insert duplicate chunks
>> and thus waiting on chunk locks with a 3-hour timeout. That left no
>> workers to make progress on uploads or respond to new requests.
>>
>> An alternative approach would be to simply use block_in_place in
>> upload_to_backend. However, that could still run out of workers if the
>> spawn_blocking pool was exhausted and would continue to create a new
>> runtime for every S3 upload from a ParallelHandler.
>>
>> Fixes: https://forum.proxmox.com/threads/183705
>> Signed-off-by: Robert Obkircher <r.obkircher@proxmox.com>
>> ---
>>   pbs-datastore/src/datastore.rs  | 78
>> ++++++++++++++++++---------------
>>   src/api2/backup/upload_chunk.rs | 12 +++--
>>   src/api2/tape/restore.rs        | 20 ++++++++-
>>   src/server/pull.rs              | 10 ++++-
>>   4 files changed, 78 insertions(+), 42 deletions(-)
>>
>> diff --git a/pbs-datastore/src/datastore.rs
>> b/pbs-datastore/src/datastore.rs
>> index e2d1ae67c..f936538ba 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -14,6 +14,7 @@ use hyper::body::Bytes;
>>   use nix::unistd::{UnlinkatFlags, unlinkat};
>>   use pbs_tools::lru_cache::LruCache;
>>   use tokio::io::AsyncWriteExt;
>> +use tokio::task::spawn_blocking;
>>   use tracing::{info, warn};
>>     use proxmox_human_byte::HumanByte;
>> @@ -2731,15 +2732,20 @@ impl DataStore {
>>       /// The backend is passed along in order to reuse an active
>> connection to the backend, created
>>       /// on environment instantiation, e.g. an s3 client which
>> lives for the whole backup session.
>>       /// This calls into async code, so callers must assure to
>> never hold a Mutex lock.
>> -    pub fn insert_chunk(
>> +    pub async fn insert_chunk(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           backend: &DatastoreBackend,
>>       ) -> Result<(bool, u64), Error> {
>>           match backend {
>> -            DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> -            DatastoreBackend::S3(s3_client) =>
>> self.insert_chunk_cached(chunk, digest, s3_client),
>> +            DatastoreBackend::Filesystem => {
>> +                let chunk_store = self.inner.chunk_store.clone();
>> +                spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> +            }
>> +            DatastoreBackend::S3(s3_client) => {
>> +                self.insert_chunk_cached(chunk, digest,
>> s3_client).await
>> +            }
>>           }
>>       }
>>   @@ -2752,53 +2758,55 @@ impl DataStore {
>>       ///
>>       /// FIXME: refactor into insert_chunk() once the backend
>> instance is cacheable on the datastore
>>       /// itself.
>> -    pub fn insert_chunk_no_cache(
>> +    pub async fn insert_chunk_no_cache(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           backend: &DatastoreBackend,
>>       ) -> Result<(bool, u64), Error> {
>> +        let chunk_store = self.inner.chunk_store.clone();
>>           match backend {
>> -            DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> +            DatastoreBackend::Filesystem => {
>> +                spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> +            }
>>               DatastoreBackend::S3(s3_client) => {
>> -                let _chunk_guard = self
>> -                    .inner
>> -                    .chunk_store
>> -                    .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> -                let chunk_data: Bytes =
>> chunk.raw_data().to_vec().into();
>> +                let _chunk_guard =
>> +                    spawn_blocking(move ||
>> chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT))
>> +                        .await??;
>> +                let chunk_data: Bytes = chunk.into_inner().into();
>>                   let chunk_size = chunk_data.len() as u64;
>> -                let object_key =
>> crate::s3::object_key_from_digest(digest)?;
>> -                let is_duplicate = proxmox_async::runtime::block_on(
>> -                   
>> s3_client.upload_no_replace_with_retry(object_key, chunk_data),
>> -                )
>> -                .map_err(|err| format_err!("failed to upload chunk
>> to s3 backend - {err:#}"))?;
>> +                let object_key =
>> crate::s3::object_key_from_digest(&digest)?;
>> +                let is_duplicate = s3_client
>> +                    .upload_no_replace_with_retry(object_key,
>> chunk_data)
>> +                    .await
>> +                    .map_err(|err| format_err!("failed to upload
>> chunk to s3 backend - {err:#}"))?;
>>                   Ok((is_duplicate, chunk_size))
>>               }
>>           }
>>       }
>>   -    fn insert_chunk_cached(
>> +    async fn insert_chunk_cached(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           s3_client: &Arc<S3Client>,
>>       ) -> Result<(bool, u64), Error> {
>>           let chunk_data = chunk.raw_data();
>>           let chunk_size = chunk_data.len() as u64;
>> -        let _chunk_guard = self
>> -            .inner
>> -            .chunk_store
>> -            .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> +
>> +        let chunk_store = self.inner.chunk_store.clone();
>> +        let _chunk_guard =
>> +            spawn_blocking(move || chunk_store.lock_chunk(&digest,
>> CHUNK_LOCK_TIMEOUT)).await??;
>>             // Avoid re-upload to S3 if the chunk is either present
>> in the in-memory LRU cache
>>           // or the chunk marker file exists on filesystem. The
>> latter means the chunk has
>>           // been uploaded in the past, but was evicted from the
>> LRU cache since but was not
>>           // cleaned up by garbage collection, so contained in the
>> S3 object store.
>> -        if self.cache_contains(digest) {
>> +        if self.cache_contains(&digest) {
>>               tracing::info!("Skip upload of cached chunk {}",
>> hex::encode(digest));
>>               return Ok((true, chunk_size));
>>           }
>> -        if let Ok(true) = self.cond_touch_chunk(digest, false) {
>> +        if let Ok(true) = self.cond_touch_chunk(&digest, false) {
>
> blocking code path
>
>>               tracing::info!(
>>                   "Skip upload of already encountered chunk {}",
>>                   hex::encode(digest),
>> @@ -2807,15 +2815,15 @@ impl DataStore {
>>           }
>>             tracing::info!("Upload new chunk {}",
>> hex::encode(digest));
>> -        let object_key = crate::s3::object_key_from_digest(digest)?;
>> +        let object_key = crate::s3::object_key_from_digest(&digest)?;
>>           let chunk_data: Bytes = chunk_data.to_vec().into();
>> -        let is_duplicate = proxmox_async::runtime::block_on(
>> -            s3_client.upload_no_replace_with_retry(object_key,
>> chunk_data),
>> -        )
>> -        .map_err(|err| format_err!("failed to upload chunk to s3
>> backend - {err:#}"))?;
>> +        let is_duplicate = s3_client
>> +            .upload_no_replace_with_retry(object_key, chunk_data)
>> +            .await
>> +            .map_err(|err| format_err!("failed to upload chunk to
>> s3 backend - {err:#}"))?;
>>           tracing::info!("Caching of chunk {}", hex::encode(digest));
>> -        self.cache_insert(digest, chunk)?;
>> -        self.inner.chunk_store.clear_chunk_expected_mark(digest)?;
>> +        self.cache_insert(&digest, &chunk)?;
>> +        self.inner.chunk_store.clear_chunk_expected_mark(&digest)?;
>
> two additional blocking code paths
>
>>           Ok((is_duplicate, chunk_size))
>>       }
>>   diff --git a/src/api2/backup/upload_chunk.rs
>> b/src/api2/backup/upload_chunk.rs
>> index 59e4caee2..3c721b1f6 100644
>> --- a/src/api2/backup/upload_chunk.rs
>> +++ b/src/api2/backup/upload_chunk.rs
>> @@ -233,13 +233,17 @@ async fn upload_to_backend(
>>           UploadChunk::new(BodyDataStream::new(req_body), digest,
>> size, encoded_size).await?;
>>         if env.no_cache {
>> -        let (is_duplicate, chunk_size) =
>> -            env.datastore
>> -                .insert_chunk_no_cache(&chunk, &digest,
>> &env.backend)?;
>> +        let (is_duplicate, chunk_size) = env
>> +            .datastore
>> +            .insert_chunk_no_cache(chunk, digest, &env.backend)
>> +            .await?;
>>           return Ok((digest, size, chunk_size as u32, is_duplicate));
>>       }
>>   -    let (is_duplicate, chunk_size) =
>> env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
>> +    let (is_duplicate, chunk_size) = env
>> +        .datastore
>> +        .insert_chunk(chunk, digest, &env.backend)
>> +        .await?;
>>       Ok((digest, size, chunk_size as u32, is_duplicate))
>>   }
>>   diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
>> index 1384fa90e..9593e1b9f 100644
>> --- a/src/api2/tape/restore.rs
>> +++ b/src/api2/tape/restore.rs
>> @@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>(
>>       let bytes2 = bytes.clone();
>>         let backend = datastore.backend()?;
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> this should better live as dedicated implementation in
> proxmox_async, so it can also be reused ...
>
>>       let writer_pool = ParallelHandler::new(
>>           "tape restore chunk writer",
>>           4,
>> @@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>(
>>                       chunk.decode(None, Some(&digest))?; // verify
>> digest
>>                   }
>>   -                datastore.insert_chunk(&chunk, &digest, &backend)?;
>> +                handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>>               }
>>               Ok(())
>>           },
>> @@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>(
>>       let bytes2 = bytes.clone();
>>         let backend = datastore.backend()?;
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>
> nit: maybe let's call this runtime_handle or rt_handle, just to keep
> some mental context for the block_on() call site.
>
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> ... here and ...
>
>>       let writer_pool = ParallelHandler::new(
>>           "tape restore chunk writer",
>>           4,
>> @@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>(
>>                       chunk.decode(None, Some(&digest))?; // verify
>> digest
>>                   }
>>   -                datastore.insert_chunk(&chunk, &digest, &backend)?;
>> +                handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>>               } else if verbose {
>>                   info!("Found existing chunk: {}",
>> hex::encode(digest));
>>               }
>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>> index e86a9c329..065b7617c 100644
>> --- a/src/server/pull.rs
>> +++ b/src/server/pull.rs
>> @@ -206,13 +206,21 @@ async fn pull_index_chunks<I: IndexFile>(
>>         let target2 = target.clone();
>>       let backend = backend.clone();
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> ... here.
>
>>       let verify_pool = ParallelHandler::new(
>>           "sync chunk writer",
>>           4,
>>           move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>>               // println!("verify and write {}",
>> hex::encode(&digest));
>>               chunk.verify_unencrypted(size as usize, &digest)?;
>> -            target2.insert_chunk(&chunk, &digest, &backend)?;
>> +            handle.block_on(target2.insert_chunk(chunk, digest,
>> &backend))?;
>>               Ok(())
>>           },
>>       );
>




      reply	other threads:[~2026-05-27 12:41 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-26 17:18 [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers Robert Obkircher
2026-05-27  9:40 ` Christian Ebner
2026-05-27 12:40   ` Robert Obkircher [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com \
    --to=r.obkircher@proxmox.com \
    --cc=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal