public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Robert Obkircher <r.obkircher@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers
Date: Tue, 26 May 2026 19:18:15 +0200	[thread overview]
Message-ID: <20260526172032.606994-1-r.obkircher@proxmox.com> (raw)

Avoid async-sync-async transitions and wrap synchronous operations in
spawn_blocking.

Previously, every s3 upload would go through proxmox_async::block_on
which has a waker implementation that relies on std::thread::park and
can thus completely block an entire tokio worker. In the backtrace
from the linked forum thread this was the case for 19 threads, while
the remaining two were presumably trying to insert duplicate chunks
and thus waiting on chunk locks with a 3-hour timeout. That left no
workers to make progress on uploads or respond to new requests.

An alternative approach would be to simply use block_in_place in
upload_to_backend. However, that could still run out of workers if the
spawn_blocking pool was exhausted and would continue to create a new
runtime for every S3 upload from a ParallelHandler.

Fixes: https://forum.proxmox.com/threads/183705
Signed-off-by: Robert Obkircher <r.obkircher@proxmox.com>
---
 pbs-datastore/src/datastore.rs  | 78 ++++++++++++++++++---------------
 src/api2/backup/upload_chunk.rs | 12 +++--
 src/api2/tape/restore.rs        | 20 ++++++++-
 src/server/pull.rs              | 10 ++++-
 4 files changed, 78 insertions(+), 42 deletions(-)

diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
index e2d1ae67c..f936538ba 100644
--- a/pbs-datastore/src/datastore.rs
+++ b/pbs-datastore/src/datastore.rs
@@ -14,6 +14,7 @@ use hyper::body::Bytes;
 use nix::unistd::{UnlinkatFlags, unlinkat};
 use pbs_tools::lru_cache::LruCache;
 use tokio::io::AsyncWriteExt;
+use tokio::task::spawn_blocking;
 use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
@@ -2731,15 +2732,20 @@ impl DataStore {
     /// The backend is passed along in order to reuse an active connection to the backend, created
     /// on environment instantiation, e.g. an s3 client which lives for the whole backup session.
     /// This calls into async code, so callers must assure to never hold a Mutex lock.
-    pub fn insert_chunk(
+    pub async fn insert_chunk(
         &self,
-        chunk: &DataBlob,
-        digest: &[u8; 32],
+        chunk: DataBlob,
+        digest: [u8; 32],
         backend: &DatastoreBackend,
     ) -> Result<(bool, u64), Error> {
         match backend {
-            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
-            DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, s3_client),
+            DatastoreBackend::Filesystem => {
+                let chunk_store = self.inner.chunk_store.clone();
+                spawn_blocking(move || chunk_store.insert_chunk(&chunk, &digest)).await?
+            }
+            DatastoreBackend::S3(s3_client) => {
+                self.insert_chunk_cached(chunk, digest, s3_client).await
+            }
         }
     }
 
@@ -2752,53 +2758,55 @@ impl DataStore {
     ///
     /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore
     /// itself.
-    pub fn insert_chunk_no_cache(
+    pub async fn insert_chunk_no_cache(
         &self,
-        chunk: &DataBlob,
-        digest: &[u8; 32],
+        chunk: DataBlob,
+        digest: [u8; 32],
         backend: &DatastoreBackend,
     ) -> Result<(bool, u64), Error> {
+        let chunk_store = self.inner.chunk_store.clone();
         match backend {
-            DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest),
+            DatastoreBackend::Filesystem => {
+                spawn_blocking(move || chunk_store.insert_chunk(&chunk, &digest)).await?
+            }
             DatastoreBackend::S3(s3_client) => {
-                let _chunk_guard = self
-                    .inner
-                    .chunk_store
-                    .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
-                let chunk_data: Bytes = chunk.raw_data().to_vec().into();
+                let _chunk_guard =
+                    spawn_blocking(move || chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT))
+                        .await??;
+                let chunk_data: Bytes = chunk.into_inner().into();
                 let chunk_size = chunk_data.len() as u64;
-                let object_key = crate::s3::object_key_from_digest(digest)?;
-                let is_duplicate = proxmox_async::runtime::block_on(
-                    s3_client.upload_no_replace_with_retry(object_key, chunk_data),
-                )
-                .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+                let object_key = crate::s3::object_key_from_digest(&digest)?;
+                let is_duplicate = s3_client
+                    .upload_no_replace_with_retry(object_key, chunk_data)
+                    .await
+                    .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
                 Ok((is_duplicate, chunk_size))
             }
         }
     }
 
-    fn insert_chunk_cached(
+    async fn insert_chunk_cached(
         &self,
-        chunk: &DataBlob,
-        digest: &[u8; 32],
+        chunk: DataBlob,
+        digest: [u8; 32],
         s3_client: &Arc<S3Client>,
     ) -> Result<(bool, u64), Error> {
         let chunk_data = chunk.raw_data();
         let chunk_size = chunk_data.len() as u64;
-        let _chunk_guard = self
-            .inner
-            .chunk_store
-            .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?;
+
+        let chunk_store = self.inner.chunk_store.clone();
+        let _chunk_guard =
+            spawn_blocking(move || chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT)).await??;
 
         // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache
         // or the chunk marker file exists on filesystem. The latter means the chunk has
         // been uploaded in the past, but was evicted from the LRU cache since but was not
         // cleaned up by garbage collection, so contained in the S3 object store.
-        if self.cache_contains(digest) {
+        if self.cache_contains(&digest) {
             tracing::info!("Skip upload of cached chunk {}", hex::encode(digest));
             return Ok((true, chunk_size));
         }
-        if let Ok(true) = self.cond_touch_chunk(digest, false) {
+        if let Ok(true) = self.cond_touch_chunk(&digest, false) {
             tracing::info!(
                 "Skip upload of already encountered chunk {}",
                 hex::encode(digest),
@@ -2807,15 +2815,15 @@ impl DataStore {
         }
 
         tracing::info!("Upload new chunk {}", hex::encode(digest));
-        let object_key = crate::s3::object_key_from_digest(digest)?;
+        let object_key = crate::s3::object_key_from_digest(&digest)?;
         let chunk_data: Bytes = chunk_data.to_vec().into();
-        let is_duplicate = proxmox_async::runtime::block_on(
-            s3_client.upload_no_replace_with_retry(object_key, chunk_data),
-        )
-        .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
+        let is_duplicate = s3_client
+            .upload_no_replace_with_retry(object_key, chunk_data)
+            .await
+            .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?;
         tracing::info!("Caching of chunk {}", hex::encode(digest));
-        self.cache_insert(digest, chunk)?;
-        self.inner.chunk_store.clear_chunk_expected_mark(digest)?;
+        self.cache_insert(&digest, &chunk)?;
+        self.inner.chunk_store.clear_chunk_expected_mark(&digest)?;
         Ok((is_duplicate, chunk_size))
     }
 
diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs
index 59e4caee2..3c721b1f6 100644
--- a/src/api2/backup/upload_chunk.rs
+++ b/src/api2/backup/upload_chunk.rs
@@ -233,13 +233,17 @@ async fn upload_to_backend(
         UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?;
 
     if env.no_cache {
-        let (is_duplicate, chunk_size) =
-            env.datastore
-                .insert_chunk_no_cache(&chunk, &digest, &env.backend)?;
+        let (is_duplicate, chunk_size) = env
+            .datastore
+            .insert_chunk_no_cache(chunk, digest, &env.backend)
+            .await?;
         return Ok((digest, size, chunk_size as u32, is_duplicate));
     }
 
-    let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?;
+    let (is_duplicate, chunk_size) = env
+        .datastore
+        .insert_chunk(chunk, digest, &env.backend)
+        .await?;
     Ok((digest, size, chunk_size as u32, is_duplicate))
 }
 
diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs
index 1384fa90e..9593e1b9f 100644
--- a/src/api2/tape/restore.rs
+++ b/src/api2/tape/restore.rs
@@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>(
     let bytes2 = bytes.clone();
 
     let backend = datastore.backend()?;
+
+    let (_rt, handle) = tokio::runtime::Handle::try_current()
+        .map(|h| (None, h))
+        .unwrap_or_else(|_| {
+            let rt = proxmox_async::runtime::get_runtime();
+            let h = rt.handle().clone();
+            (Some(rt), h)
+        });
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest, &backend)?;
+                handle.block_on(datastore.insert_chunk(chunk, digest, &backend))?;
             }
             Ok(())
         },
@@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>(
     let bytes2 = bytes.clone();
 
     let backend = datastore.backend()?;
+
+    let (_rt, handle) = tokio::runtime::Handle::try_current()
+        .map(|h| (None, h))
+        .unwrap_or_else(|_| {
+            let rt = proxmox_async::runtime::get_runtime();
+            let h = rt.handle().clone();
+            (Some(rt), h)
+        });
     let writer_pool = ParallelHandler::new(
         "tape restore chunk writer",
         4,
@@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>(
                     chunk.decode(None, Some(&digest))?; // verify digest
                 }
 
-                datastore.insert_chunk(&chunk, &digest, &backend)?;
+                handle.block_on(datastore.insert_chunk(chunk, digest, &backend))?;
             } else if verbose {
                 info!("Found existing chunk: {}", hex::encode(digest));
             }
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e86a9c329..065b7617c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -206,13 +206,21 @@ async fn pull_index_chunks<I: IndexFile>(
 
     let target2 = target.clone();
     let backend = backend.clone();
+
+    let (_rt, handle) = tokio::runtime::Handle::try_current()
+        .map(|h| (None, h))
+        .unwrap_or_else(|_| {
+            let rt = proxmox_async::runtime::get_runtime();
+            let h = rt.handle().clone();
+            (Some(rt), h)
+        });
     let verify_pool = ParallelHandler::new(
         "sync chunk writer",
         4,
         move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| {
             // println!("verify and write {}", hex::encode(&digest));
             chunk.verify_unencrypted(size as usize, &digest)?;
-            target2.insert_chunk(&chunk, &digest, &backend)?;
+            handle.block_on(target2.insert_chunk(chunk, digest, &backend))?;
             Ok(())
         },
     );
-- 
2.47.3





             reply	other threads:[~2026-05-26 17:21 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-26 17:18 Robert Obkircher [this message]
2026-05-27  9:40 ` [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers Christian Ebner
2026-05-27 12:40   ` Robert Obkircher

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260526172032.606994-1-r.obkircher@proxmox.com \
    --to=r.obkircher@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal