all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Robert Obkircher <r.obkircher@proxmox.com>
To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com
Subject: Re: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers
Date: Wed, 27 May 2026 14:40:59 +0200	[thread overview]
Message-ID: <78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com> (raw)
In-Reply-To: <ad024ac0-5986-44cc-80a5-6a59602a035e@proxmox.com>


On 27.05.26 11:40, Christian Ebner wrote:
> Thanks a lot for debugging and tacking this so quickly!
>
> As already noted by yourself and discussed off-list, there are
> additional places interacting with the chunk store and cache which
> would require wrapping by a spawn_blocking() or implemented as async.
>
> One major concern here is the regression potential for both S3 and
> regular datastores, especially with respect to performance. So it
> would be great to have some before/after benchmarks as well.
>
> Give the scope, we might consider the alternative implementation
> using block_in_place() as bugfix for the time being, the code paths
> for regular datastores being battle proved for years now. 
I've realized that the alternative implementation can also use
spawn_blocking and sent a new patch:

https://lore.proxmox.com/pbs-devel/20260527123809.238964-1-r.obkircher@proxmox.com/T/#u

>
> On 5/26/26 7:21 PM, Robert Obkircher wrote:
>> Avoid async-sync-async transitions and wrap synchronous operations in
>> spawn_blocking.
>>
>> Previously, every s3 upload would go through proxmox_async::block_on
>> which has a waker implementation that relies on std::thread::park and
>> can thus completely block an entire tokio worker. In the backtrace
>> from the linked forum thread this was the case for 19 threads, while
>> the remaining two were presumably trying to insert duplicate chunks
>> and thus waiting on chunk locks with a 3-hour timeout. That left no
>> workers to make progress on uploads or respond to new requests.
>>
>> An alternative approach would be to simply use block_in_place in
>> upload_to_backend. However, that could still run out of workers if the
>> spawn_blocking pool was exhausted and would continue to create a new
>> runtime for every S3 upload from a ParallelHandler.
>>
>> Fixes: https://forum.proxmox.com/threads/183705
>> Signed-off-by: Robert Obkircher <r.obkircher@proxmox.com>
>> ---
>>   pbs-datastore/src/datastore.rs  | 78
>> ++++++++++++++++++---------------
>>   src/api2/backup/upload_chunk.rs | 12 +++--
>>   src/api2/tape/restore.rs        | 20 ++++++++-
>>   src/server/pull.rs              | 10 ++++-
>>   4 files changed, 78 insertions(+), 42 deletions(-)
>>
>> diff --git a/pbs-datastore/src/datastore.rs
>> b/pbs-datastore/src/datastore.rs
>> index e2d1ae67c..f936538ba 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -14,6 +14,7 @@ use hyper::body::Bytes;
>>   use nix::unistd::{UnlinkatFlags, unlinkat};
>>   use pbs_tools::lru_cache::LruCache;
>>   use tokio::io::AsyncWriteExt;
>> +use tokio::task::spawn_blocking;
>>   use tracing::{info, warn};
>>     use proxmox_human_byte::HumanByte;
>> @@ -2731,15 +2732,20 @@ impl DataStore {
>>       /// The backend is passed along in order to reuse an active
>> connection to the backend, created
>>       /// on environment instantiation, e.g. an s3 client which
>> lives for the whole backup session.
>>       /// This calls into async code, so callers must assure to
>> never hold a Mutex lock.
>> -    pub fn insert_chunk(
>> +    pub async fn insert_chunk(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           backend: &DatastoreBackend,
>>       ) -> Result<(bool, u64), Error> {
>>           match backend {
>> -            DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> -            DatastoreBackend::S3(s3_client) =>
>> self.insert_chunk_cached(chunk, digest, s3_client),
>> +            DatastoreBackend::Filesystem => {
>> +                let chunk_store = self.inner.chunk_store.clone();
>> +                spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> +            }
>> +            DatastoreBackend::S3(s3_client) => {
>> +                self.insert_chunk_cached(chunk, digest,
>> s3_client).await
>> +            }
>>           }
>>       }
>>   @@ -2752,53 +2758,55 @@ impl DataStore {
>>       ///
>>       /// FIXME: refactor into insert_chunk() once the backend
>> instance is cacheable on the datastore
>>       /// itself.
>> -    pub fn insert_chunk_no_cache(
>> +    pub async fn insert_chunk_no_cache(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           backend: &DatastoreBackend,
>>       ) -> Result<(bool, u64), Error> {
>> +        let chunk_store = self.inner.chunk_store.clone();
>>           match backend {
>> -            DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> +            DatastoreBackend::Filesystem => {
>> +                spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> +            }
>>               DatastoreBackend::S3(s3_client) => {
>> -                let _chunk_guard = self
>> -                    .inner
>> -                    .chunk_store
>> -                    .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> -                let chunk_data: Bytes =
>> chunk.raw_data().to_vec().into();
>> +                let _chunk_guard =
>> +                    spawn_blocking(move ||
>> chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT))
>> +                        .await??;
>> +                let chunk_data: Bytes = chunk.into_inner().into();
>>                   let chunk_size = chunk_data.len() as u64;
>> -                let object_key =
>> crate::s3::object_key_from_digest(digest)?;
>> -                let is_duplicate = proxmox_async::runtime::block_on(
>> -                   
>> s3_client.upload_no_replace_with_retry(object_key, chunk_data),
>> -                )
>> -                .map_err(|err| format_err!("failed to upload chunk
>> to s3 backend - {err:#}"))?;
>> +                let object_key =
>> crate::s3::object_key_from_digest(&digest)?;
>> +                let is_duplicate = s3_client
>> +                    .upload_no_replace_with_retry(object_key,
>> chunk_data)
>> +                    .await
>> +                    .map_err(|err| format_err!("failed to upload
>> chunk to s3 backend - {err:#}"))?;
>>                   Ok((is_duplicate, chunk_size))
>>               }
>>           }
>>       }
>>   -    fn insert_chunk_cached(
>> +    async fn insert_chunk_cached(
>>           &self,
>> -        chunk: &DataBlob,
>> -        digest: &[u8; 32],
>> +        chunk: DataBlob,
>> +        digest: [u8; 32],
>>           s3_client: &Arc<S3Client>,
>>       ) -> Result<(bool, u64), Error> {
>>           let chunk_data = chunk.raw_data();
>>           let chunk_size = chunk_data.len() as u64;
>> -        let _chunk_guard = self
>> -            .inner
>> -            .chunk_store
>> -            .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> +
>> +        let chunk_store = self.inner.chunk_store.clone();
>> +        let _chunk_guard =
>> +            spawn_blocking(move || chunk_store.lock_chunk(&digest,
>> CHUNK_LOCK_TIMEOUT)).await??;
>>             // Avoid re-upload to S3 if the chunk is either present
>> in the in-memory LRU cache
>>           // or the chunk marker file exists on filesystem. The
>> latter means the chunk has
>>           // been uploaded in the past, but was evicted from the
>> LRU cache since but was not
>>           // cleaned up by garbage collection, so contained in the
>> S3 object store.
>> -        if self.cache_contains(digest) {
>> +        if self.cache_contains(&digest) {
>>               tracing::info!("Skip upload of cached chunk {}",
>> hex::encode(digest));
>>               return Ok((true, chunk_size));
>>           }
>> -        if let Ok(true) = self.cond_touch_chunk(digest, false) {
>> +        if let Ok(true) = self.cond_touch_chunk(&digest, false) {
>
> blocking code path
>
>>               tracing::info!(
>>                   "Skip upload of already encountered chunk {}",
>>                   hex::encode(digest),
>> @@ -2807,15 +2815,15 @@ impl DataStore {
>>           }
>>             tracing::info!("Upload new chunk {}",
>> hex::encode(digest));
>> -        let object_key = crate::s3::object_key_from_digest(digest)?;
>> +        let object_key = crate::s3::object_key_from_digest(&digest)?;
>>           let chunk_data: Bytes = chunk_data.to_vec().into();
>> -        let is_duplicate = proxmox_async::runtime::block_on(
>> -            s3_client.upload_no_replace_with_retry(object_key,
>> chunk_data),
>> -        )
>> -        .map_err(|err| format_err!("failed to upload chunk to s3
>> backend - {err:#}"))?;
>> +        let is_duplicate = s3_client
>> +            .upload_no_replace_with_retry(object_key, chunk_data)
>> +            .await
>> +            .map_err(|err| format_err!("failed to upload chunk to
>> s3 backend - {err:#}"))?;
>>           tracing::info!("Caching of chunk {}", hex::encode(digest));
>> -        self.cache_insert(digest, chunk)?;
>> -        self.inner.chunk_store.clear_chunk_expected_mark(digest)?;
>> +        self.cache_insert(&digest, &chunk)?;
>> +        self.inner.chunk_store.clear_chunk_expected_mark(&digest)?;
>
> two additional blocking code paths
>
>>           Ok((is_duplicate, chunk_size))
>>       }
>>   diff --git a/src/api2/backup/upload_chunk.rs
>> b/src/api2/backup/upload_chunk.rs
>> index 59e4caee2..3c721b1f6 100644
>> --- a/src/api2/backup/upload_chunk.rs
>> +++ b/src/api2/backup/upload_chunk.rs
>> @@ -233,13 +233,17 @@ async fn upload_to_backend(
>>           UploadChunk::new(BodyDataStream::new(req_body), digest,
>> size, encoded_size).await?;
>>         if env.no_cache {
>> -        let (is_duplicate, chunk_size) =
>> -            env.datastore
>> -                .insert_chunk_no_cache(&chunk, &digest,
>> &env.backend)?;
>> +        let (is_duplicate, chunk_size) = env
>> +            .datastore
>> +            .insert_chunk_no_cache(chunk, digest, &env.backend)
>> +            .await?;
>>           return Ok((digest, size, chunk_size as u32, is_duplicate));
>>       }
>>   -    let (is_duplicate, chunk_size) =
>> env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
>> +    let (is_duplicate, chunk_size) = env
>> +        .datastore
>> +        .insert_chunk(chunk, digest, &env.backend)
>> +        .await?;
>>       Ok((digest, size, chunk_size as u32, is_duplicate))
>>   }
>>   diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
>> index 1384fa90e..9593e1b9f 100644
>> --- a/src/api2/tape/restore.rs
>> +++ b/src/api2/tape/restore.rs
>> @@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>(
>>       let bytes2 = bytes.clone();
>>         let backend = datastore.backend()?;
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> this should better live as dedicated implementation in
> proxmox_async, so it can also be reused ...
>
>>       let writer_pool = ParallelHandler::new(
>>           "tape restore chunk writer",
>>           4,
>> @@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>(
>>                       chunk.decode(None, Some(&digest))?; // verify
>> digest
>>                   }
>>   -                datastore.insert_chunk(&chunk, &digest, &backend)?;
>> +                handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>>               }
>>               Ok(())
>>           },
>> @@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>(
>>       let bytes2 = bytes.clone();
>>         let backend = datastore.backend()?;
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>
> nit: maybe let's call this runtime_handle or rt_handle, just to keep
> some mental context for the block_on() call site.
>
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> ... here and ...
>
>>       let writer_pool = ParallelHandler::new(
>>           "tape restore chunk writer",
>>           4,
>> @@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>(
>>                       chunk.decode(None, Some(&digest))?; // verify
>> digest
>>                   }
>>   -                datastore.insert_chunk(&chunk, &digest, &backend)?;
>> +                handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>>               } else if verbose {
>>                   info!("Found existing chunk: {}",
>> hex::encode(digest));
>>               }
>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>> index e86a9c329..065b7617c 100644
>> --- a/src/server/pull.rs
>> +++ b/src/server/pull.rs
>> @@ -206,13 +206,21 @@ async fn pull_index_chunks<I: IndexFile>(
>>         let target2 = target.clone();
>>       let backend = backend.clone();
>> +
>> +    let (_rt, handle) = tokio::runtime::Handle::try_current()
>> +        .map(|h| (None, h))
>> +        .unwrap_or_else(|_| {
>> +            let rt = proxmox_async::runtime::get_runtime();
>> +            let h = rt.handle().clone();
>> +            (Some(rt), h)
>> +        });
>
> ... here.
>
>>       let verify_pool = ParallelHandler::new(
>>           "sync chunk writer",
>>           4,
>>           move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>>               // println!("verify and write {}",
>> hex::encode(&digest));
>>               chunk.verify_unencrypted(size as usize, &digest)?;
>> -            target2.insert_chunk(&chunk, &digest, &backend)?;
>> +            handle.block_on(target2.insert_chunk(chunk, digest,
>> &backend))?;
>>               Ok(())
>>           },
>>       );
>




      reply	other threads:[~2026-05-27 12:41 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-26 17:18 [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers Robert Obkircher
2026-05-27  9:40 ` Christian Ebner
2026-05-27 12:40   ` Robert Obkircher [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com \
    --to=r.obkircher@proxmox.com \
    --cc=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal