From: Robert Obkircher <r.obkircher@proxmox.com>
To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com
Subject: Re: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers
Date: Wed, 27 May 2026 14:40:59 +0200 [thread overview]
Message-ID: <78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com> (raw)
In-Reply-To: <ad024ac0-5986-44cc-80a5-6a59602a035e@proxmox.com>
On 27.05.26 11:40, Christian Ebner wrote:
> Thanks a lot for debugging and tacking this so quickly!
>
> As already noted by yourself and discussed off-list, there are
> additional places interacting with the chunk store and cache which
> would require wrapping by a spawn_blocking() or implemented as async.
>
> One major concern here is the regression potential for both S3 and
> regular datastores, especially with respect to performance. So it
> would be great to have some before/after benchmarks as well.
>
> Give the scope, we might consider the alternative implementation
> using block_in_place() as bugfix for the time being, the code paths
> for regular datastores being battle proved for years now.
I've realized that the alternative implementation can also use
spawn_blocking and sent a new patch:
https://lore.proxmox.com/pbs-devel/20260527123809.238964-1-r.obkircher@proxmox.com/T/#u
>
> On 5/26/26 7:21 PM, Robert Obkircher wrote:
>> Avoid async-sync-async transitions and wrap synchronous operations in
>> spawn_blocking.
>>
>> Previously, every s3 upload would go through proxmox_async::block_on
>> which has a waker implementation that relies on std::thread::park and
>> can thus completely block an entire tokio worker. In the backtrace
>> from the linked forum thread this was the case for 19 threads, while
>> the remaining two were presumably trying to insert duplicate chunks
>> and thus waiting on chunk locks with a 3-hour timeout. That left no
>> workers to make progress on uploads or respond to new requests.
>>
>> An alternative approach would be to simply use block_in_place in
>> upload_to_backend. However, that could still run out of workers if the
>> spawn_blocking pool was exhausted and would continue to create a new
>> runtime for every S3 upload from a ParallelHandler.
>>
>> Fixes: https://forum.proxmox.com/threads/183705
>> Signed-off-by: Robert Obkircher <r.obkircher@proxmox.com>
>> ---
>> pbs-datastore/src/datastore.rs | 78
>> ++++++++++++++++++---------------
>> src/api2/backup/upload_chunk.rs | 12 +++--
>> src/api2/tape/restore.rs | 20 ++++++++-
>> src/server/pull.rs | 10 ++++-
>> 4 files changed, 78 insertions(+), 42 deletions(-)
>>
>> diff --git a/pbs-datastore/src/datastore.rs
>> b/pbs-datastore/src/datastore.rs
>> index e2d1ae67c..f936538ba 100644
>> --- a/pbs-datastore/src/datastore.rs
>> +++ b/pbs-datastore/src/datastore.rs
>> @@ -14,6 +14,7 @@ use hyper::body::Bytes;
>> use nix::unistd::{UnlinkatFlags, unlinkat};
>> use pbs_tools::lru_cache::LruCache;
>> use tokio::io::AsyncWriteExt;
>> +use tokio::task::spawn_blocking;
>> use tracing::{info, warn};
>> use proxmox_human_byte::HumanByte;
>> @@ -2731,15 +2732,20 @@ impl DataStore {
>> /// The backend is passed along in order to reuse an active
>> connection to the backend, created
>> /// on environment instantiation, e.g. an s3 client which
>> lives for the whole backup session.
>> /// This calls into async code, so callers must assure to
>> never hold a Mutex lock.
>> - pub fn insert_chunk(
>> + pub async fn insert_chunk(
>> &self,
>> - chunk: &DataBlob,
>> - digest: &[u8; 32],
>> + chunk: DataBlob,
>> + digest: [u8; 32],
>> backend: &DatastoreBackend,
>> ) -> Result<(bool, u64), Error> {
>> match backend {
>> - DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> - DatastoreBackend::S3(s3_client) =>
>> self.insert_chunk_cached(chunk, digest, s3_client),
>> + DatastoreBackend::Filesystem => {
>> + let chunk_store = self.inner.chunk_store.clone();
>> + spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> + }
>> + DatastoreBackend::S3(s3_client) => {
>> + self.insert_chunk_cached(chunk, digest,
>> s3_client).await
>> + }
>> }
>> }
>> @@ -2752,53 +2758,55 @@ impl DataStore {
>> ///
>> /// FIXME: refactor into insert_chunk() once the backend
>> instance is cacheable on the datastore
>> /// itself.
>> - pub fn insert_chunk_no_cache(
>> + pub async fn insert_chunk_no_cache(
>> &self,
>> - chunk: &DataBlob,
>> - digest: &[u8; 32],
>> + chunk: DataBlob,
>> + digest: [u8; 32],
>> backend: &DatastoreBackend,
>> ) -> Result<(bool, u64), Error> {
>> + let chunk_store = self.inner.chunk_store.clone();
>> match backend {
>> - DatastoreBackend::Filesystem =>
>> self.inner.chunk_store.insert_chunk(chunk, digest),
>> + DatastoreBackend::Filesystem => {
>> + spawn_blocking(move ||
>> chunk_store.insert_chunk(&chunk, &digest)).await?
>> + }
>> DatastoreBackend::S3(s3_client) => {
>> - let _chunk_guard = self
>> - .inner
>> - .chunk_store
>> - .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> - let chunk_data: Bytes =
>> chunk.raw_data().to_vec().into();
>> + let _chunk_guard =
>> + spawn_blocking(move ||
>> chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT))
>> + .await??;
>> + let chunk_data: Bytes = chunk.into_inner().into();
>> let chunk_size = chunk_data.len() as u64;
>> - let object_key =
>> crate::s3::object_key_from_digest(digest)?;
>> - let is_duplicate = proxmox_async::runtime::block_on(
>> -
>> s3_client.upload_no_replace_with_retry(object_key, chunk_data),
>> - )
>> - .map_err(|err| format_err!("failed to upload chunk
>> to s3 backend - {err:#}"))?;
>> + let object_key =
>> crate::s3::object_key_from_digest(&digest)?;
>> + let is_duplicate = s3_client
>> + .upload_no_replace_with_retry(object_key,
>> chunk_data)
>> + .await
>> + .map_err(|err| format_err!("failed to upload
>> chunk to s3 backend - {err:#}"))?;
>> Ok((is_duplicate, chunk_size))
>> }
>> }
>> }
>> - fn insert_chunk_cached(
>> + async fn insert_chunk_cached(
>> &self,
>> - chunk: &DataBlob,
>> - digest: &[u8; 32],
>> + chunk: DataBlob,
>> + digest: [u8; 32],
>> s3_client: &Arc<S3Client>,
>> ) -> Result<(bool, u64), Error> {
>> let chunk_data = chunk.raw_data();
>> let chunk_size = chunk_data.len() as u64;
>> - let _chunk_guard = self
>> - .inner
>> - .chunk_store
>> - .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
>> +
>> + let chunk_store = self.inner.chunk_store.clone();
>> + let _chunk_guard =
>> + spawn_blocking(move || chunk_store.lock_chunk(&digest,
>> CHUNK_LOCK_TIMEOUT)).await??;
>> // Avoid re-upload to S3 if the chunk is either present
>> in the in-memory LRU cache
>> // or the chunk marker file exists on filesystem. The
>> latter means the chunk has
>> // been uploaded in the past, but was evicted from the
>> LRU cache since but was not
>> // cleaned up by garbage collection, so contained in the
>> S3 object store.
>> - if self.cache_contains(digest) {
>> + if self.cache_contains(&digest) {
>> tracing::info!("Skip upload of cached chunk {}",
>> hex::encode(digest));
>> return Ok((true, chunk_size));
>> }
>> - if let Ok(true) = self.cond_touch_chunk(digest, false) {
>> + if let Ok(true) = self.cond_touch_chunk(&digest, false) {
>
> blocking code path
>
>> tracing::info!(
>> "Skip upload of already encountered chunk {}",
>> hex::encode(digest),
>> @@ -2807,15 +2815,15 @@ impl DataStore {
>> }
>> tracing::info!("Upload new chunk {}",
>> hex::encode(digest));
>> - let object_key = crate::s3::object_key_from_digest(digest)?;
>> + let object_key = crate::s3::object_key_from_digest(&digest)?;
>> let chunk_data: Bytes = chunk_data.to_vec().into();
>> - let is_duplicate = proxmox_async::runtime::block_on(
>> - s3_client.upload_no_replace_with_retry(object_key,
>> chunk_data),
>> - )
>> - .map_err(|err| format_err!("failed to upload chunk to s3
>> backend - {err:#}"))?;
>> + let is_duplicate = s3_client
>> + .upload_no_replace_with_retry(object_key, chunk_data)
>> + .await
>> + .map_err(|err| format_err!("failed to upload chunk to
>> s3 backend - {err:#}"))?;
>> tracing::info!("Caching of chunk {}", hex::encode(digest));
>> - self.cache_insert(digest, chunk)?;
>> - self.inner.chunk_store.clear_chunk_expected_mark(digest)?;
>> + self.cache_insert(&digest, &chunk)?;
>> + self.inner.chunk_store.clear_chunk_expected_mark(&digest)?;
>
> two additional blocking code paths
>
>> Ok((is_duplicate, chunk_size))
>> }
>> diff --git a/src/api2/backup/upload_chunk.rs
>> b/src/api2/backup/upload_chunk.rs
>> index 59e4caee2..3c721b1f6 100644
>> --- a/src/api2/backup/upload_chunk.rs
>> +++ b/src/api2/backup/upload_chunk.rs
>> @@ -233,13 +233,17 @@ async fn upload_to_backend(
>> UploadChunk::new(BodyDataStream::new(req_body), digest,
>> size, encoded_size).await?;
>> if env.no_cache {
>> - let (is_duplicate, chunk_size) =
>> - env.datastore
>> - .insert_chunk_no_cache(&chunk, &digest,
>> &env.backend)?;
>> + let (is_duplicate, chunk_size) = env
>> + .datastore
>> + .insert_chunk_no_cache(chunk, digest, &env.backend)
>> + .await?;
>> return Ok((digest, size, chunk_size as u32, is_duplicate));
>> }
>> - let (is_duplicate, chunk_size) =
>> env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
>> + let (is_duplicate, chunk_size) = env
>> + .datastore
>> + .insert_chunk(chunk, digest, &env.backend)
>> + .await?;
>> Ok((digest, size, chunk_size as u32, is_duplicate))
>> }
>> diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
>> index 1384fa90e..9593e1b9f 100644
>> --- a/src/api2/tape/restore.rs
>> +++ b/src/api2/tape/restore.rs
>> @@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>(
>> let bytes2 = bytes.clone();
>> let backend = datastore.backend()?;
>> +
>> + let (_rt, handle) = tokio::runtime::Handle::try_current()
>> + .map(|h| (None, h))
>> + .unwrap_or_else(|_| {
>> + let rt = proxmox_async::runtime::get_runtime();
>> + let h = rt.handle().clone();
>> + (Some(rt), h)
>> + });
>
> this should better live as dedicated implementation in
> proxmox_async, so it can also be reused ...
>
>> let writer_pool = ParallelHandler::new(
>> "tape restore chunk writer",
>> 4,
>> @@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>(
>> chunk.decode(None, Some(&digest))?; // verify
>> digest
>> }
>> - datastore.insert_chunk(&chunk, &digest, &backend)?;
>> + handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>> }
>> Ok(())
>> },
>> @@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>(
>> let bytes2 = bytes.clone();
>> let backend = datastore.backend()?;
>> +
>> + let (_rt, handle) = tokio::runtime::Handle::try_current()
>
> nit: maybe let's call this runtime_handle or rt_handle, just to keep
> some mental context for the block_on() call site.
>
>> + .map(|h| (None, h))
>> + .unwrap_or_else(|_| {
>> + let rt = proxmox_async::runtime::get_runtime();
>> + let h = rt.handle().clone();
>> + (Some(rt), h)
>> + });
>
> ... here and ...
>
>> let writer_pool = ParallelHandler::new(
>> "tape restore chunk writer",
>> 4,
>> @@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>(
>> chunk.decode(None, Some(&digest))?; // verify
>> digest
>> }
>> - datastore.insert_chunk(&chunk, &digest, &backend)?;
>> + handle.block_on(datastore.insert_chunk(chunk,
>> digest, &backend))?;
>> } else if verbose {
>> info!("Found existing chunk: {}",
>> hex::encode(digest));
>> }
>> diff --git a/src/server/pull.rs b/src/server/pull.rs
>> index e86a9c329..065b7617c 100644
>> --- a/src/server/pull.rs
>> +++ b/src/server/pull.rs
>> @@ -206,13 +206,21 @@ async fn pull_index_chunks<I: IndexFile>(
>> let target2 = target.clone();
>> let backend = backend.clone();
>> +
>> + let (_rt, handle) = tokio::runtime::Handle::try_current()
>> + .map(|h| (None, h))
>> + .unwrap_or_else(|_| {
>> + let rt = proxmox_async::runtime::get_runtime();
>> + let h = rt.handle().clone();
>> + (Some(rt), h)
>> + });
>
> ... here.
>
>> let verify_pool = ParallelHandler::new(
>> "sync chunk writer",
>> 4,
>> move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
>> // println!("verify and write {}",
>> hex::encode(&digest));
>> chunk.verify_unencrypted(size as usize, &digest)?;
>> - target2.insert_chunk(&chunk, &digest, &backend)?;
>> + handle.block_on(target2.insert_chunk(chunk, digest,
>> &backend))?;
>> Ok(())
>> },
>> );
>
prev parent reply other threads:[~2026-05-27 12:41 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-26 17:18 [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers Robert Obkircher
2026-05-27 9:40 ` Christian Ebner
2026-05-27 12:40 ` Robert Obkircher [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com \
--to=r.obkircher@proxmox.com \
--cc=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox