From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6E9D51FF146 for ; Tue, 26 May 2026 19:21:41 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2324B76B2; Tue, 26 May 2026 19:21:40 +0200 (CEST) From: Robert Obkircher To: pbs-devel@lists.proxmox.com Subject: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers Date: Tue, 26 May 2026 19:18:15 +0200 Message-ID: <20260526172032.606994-1-r.obkircher@proxmox.com> X-Mailer: git-send-email 2.47.3 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1779816039801 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.055 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [datastore.rs,restore.rs,pull.rs,proxmox.com] Message-ID-Hash: BXYQR2QL7F7OWAJ5B5F43PSAE4WM6BYF X-Message-ID-Hash: BXYQR2QL7F7OWAJ5B5F43PSAE4WM6BYF X-MailFrom: r.obkircher@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Avoid async-sync-async transitions and wrap synchronous operations in spawn_blocking. Previously, every s3 upload would go through proxmox_async::block_on which has a waker implementation that relies on std::thread::park and can thus completely block an entire tokio worker. In the backtrace from the linked forum thread this was the case for 19 threads, while the remaining two were presumably trying to insert duplicate chunks and thus waiting on chunk locks with a 3-hour timeout. That left no workers to make progress on uploads or respond to new requests. An alternative approach would be to simply use block_in_place in upload_to_backend. However, that could still run out of workers if the spawn_blocking pool was exhausted and would continue to create a new runtime for every S3 upload from a ParallelHandler. Fixes: https://forum.proxmox.com/threads/183705 Signed-off-by: Robert Obkircher --- pbs-datastore/src/datastore.rs | 78 ++++++++++++++++++--------------- src/api2/backup/upload_chunk.rs | 12 +++-- src/api2/tape/restore.rs | 20 ++++++++- src/server/pull.rs | 10 ++++- 4 files changed, 78 insertions(+), 42 deletions(-) diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs index e2d1ae67c..f936538ba 100644 --- a/pbs-datastore/src/datastore.rs +++ b/pbs-datastore/src/datastore.rs @@ -14,6 +14,7 @@ use hyper::body::Bytes; use nix::unistd::{UnlinkatFlags, unlinkat}; use pbs_tools::lru_cache::LruCache; use tokio::io::AsyncWriteExt; +use tokio::task::spawn_blocking; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; @@ -2731,15 +2732,20 @@ impl DataStore { /// The backend is passed along in order to reuse an active connection to the backend, created /// on environment instantiation, e.g. an s3 client which lives for the whole backup session. /// This calls into async code, so callers must assure to never hold a Mutex lock. - pub fn insert_chunk( + pub async fn insert_chunk( &self, - chunk: &DataBlob, - digest: &[u8; 32], + chunk: DataBlob, + digest: [u8; 32], backend: &DatastoreBackend, ) -> Result<(bool, u64), Error> { match backend { - DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), - DatastoreBackend::S3(s3_client) => self.insert_chunk_cached(chunk, digest, s3_client), + DatastoreBackend::Filesystem => { + let chunk_store = self.inner.chunk_store.clone(); + spawn_blocking(move || chunk_store.insert_chunk(&chunk, &digest)).await? + } + DatastoreBackend::S3(s3_client) => { + self.insert_chunk_cached(chunk, digest, s3_client).await + } } } @@ -2752,53 +2758,55 @@ impl DataStore { /// /// FIXME: refactor into insert_chunk() once the backend instance is cacheable on the datastore /// itself. - pub fn insert_chunk_no_cache( + pub async fn insert_chunk_no_cache( &self, - chunk: &DataBlob, - digest: &[u8; 32], + chunk: DataBlob, + digest: [u8; 32], backend: &DatastoreBackend, ) -> Result<(bool, u64), Error> { + let chunk_store = self.inner.chunk_store.clone(); match backend { - DatastoreBackend::Filesystem => self.inner.chunk_store.insert_chunk(chunk, digest), + DatastoreBackend::Filesystem => { + spawn_blocking(move || chunk_store.insert_chunk(&chunk, &digest)).await? + } DatastoreBackend::S3(s3_client) => { - let _chunk_guard = self - .inner - .chunk_store - .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?; - let chunk_data: Bytes = chunk.raw_data().to_vec().into(); + let _chunk_guard = + spawn_blocking(move || chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT)) + .await??; + let chunk_data: Bytes = chunk.into_inner().into(); let chunk_size = chunk_data.len() as u64; - let object_key = crate::s3::object_key_from_digest(digest)?; - let is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, chunk_data), - ) - .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; + let object_key = crate::s3::object_key_from_digest(&digest)?; + let is_duplicate = s3_client + .upload_no_replace_with_retry(object_key, chunk_data) + .await + .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; Ok((is_duplicate, chunk_size)) } } } - fn insert_chunk_cached( + async fn insert_chunk_cached( &self, - chunk: &DataBlob, - digest: &[u8; 32], + chunk: DataBlob, + digest: [u8; 32], s3_client: &Arc, ) -> Result<(bool, u64), Error> { let chunk_data = chunk.raw_data(); let chunk_size = chunk_data.len() as u64; - let _chunk_guard = self - .inner - .chunk_store - .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?; + + let chunk_store = self.inner.chunk_store.clone(); + let _chunk_guard = + spawn_blocking(move || chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT)).await??; // Avoid re-upload to S3 if the chunk is either present in the in-memory LRU cache // or the chunk marker file exists on filesystem. The latter means the chunk has // been uploaded in the past, but was evicted from the LRU cache since but was not // cleaned up by garbage collection, so contained in the S3 object store. - if self.cache_contains(digest) { + if self.cache_contains(&digest) { tracing::info!("Skip upload of cached chunk {}", hex::encode(digest)); return Ok((true, chunk_size)); } - if let Ok(true) = self.cond_touch_chunk(digest, false) { + if let Ok(true) = self.cond_touch_chunk(&digest, false) { tracing::info!( "Skip upload of already encountered chunk {}", hex::encode(digest), @@ -2807,15 +2815,15 @@ impl DataStore { } tracing::info!("Upload new chunk {}", hex::encode(digest)); - let object_key = crate::s3::object_key_from_digest(digest)?; + let object_key = crate::s3::object_key_from_digest(&digest)?; let chunk_data: Bytes = chunk_data.to_vec().into(); - let is_duplicate = proxmox_async::runtime::block_on( - s3_client.upload_no_replace_with_retry(object_key, chunk_data), - ) - .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; + let is_duplicate = s3_client + .upload_no_replace_with_retry(object_key, chunk_data) + .await + .map_err(|err| format_err!("failed to upload chunk to s3 backend - {err:#}"))?; tracing::info!("Caching of chunk {}", hex::encode(digest)); - self.cache_insert(digest, chunk)?; - self.inner.chunk_store.clear_chunk_expected_mark(digest)?; + self.cache_insert(&digest, &chunk)?; + self.inner.chunk_store.clear_chunk_expected_mark(&digest)?; Ok((is_duplicate, chunk_size)) } diff --git a/src/api2/backup/upload_chunk.rs b/src/api2/backup/upload_chunk.rs index 59e4caee2..3c721b1f6 100644 --- a/src/api2/backup/upload_chunk.rs +++ b/src/api2/backup/upload_chunk.rs @@ -233,13 +233,17 @@ async fn upload_to_backend( UploadChunk::new(BodyDataStream::new(req_body), digest, size, encoded_size).await?; if env.no_cache { - let (is_duplicate, chunk_size) = - env.datastore - .insert_chunk_no_cache(&chunk, &digest, &env.backend)?; + let (is_duplicate, chunk_size) = env + .datastore + .insert_chunk_no_cache(chunk, digest, &env.backend) + .await?; return Ok((digest, size, chunk_size as u32, is_duplicate)); } - let (is_duplicate, chunk_size) = env.datastore.insert_chunk(&chunk, &digest, &env.backend)?; + let (is_duplicate, chunk_size) = env + .datastore + .insert_chunk(chunk, digest, &env.backend) + .await?; Ok((digest, size, chunk_size as u32, is_duplicate)) } diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs index 1384fa90e..9593e1b9f 100644 --- a/src/api2/tape/restore.rs +++ b/src/api2/tape/restore.rs @@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>( let bytes2 = bytes.clone(); let backend = datastore.backend()?; + + let (_rt, handle) = tokio::runtime::Handle::try_current() + .map(|h| (None, h)) + .unwrap_or_else(|_| { + let rt = proxmox_async::runtime::get_runtime(); + let h = rt.handle().clone(); + (Some(rt), h) + }); let writer_pool = ParallelHandler::new( "tape restore chunk writer", 4, @@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>( chunk.decode(None, Some(&digest))?; // verify digest } - datastore.insert_chunk(&chunk, &digest, &backend)?; + handle.block_on(datastore.insert_chunk(chunk, digest, &backend))?; } Ok(()) }, @@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>( let bytes2 = bytes.clone(); let backend = datastore.backend()?; + + let (_rt, handle) = tokio::runtime::Handle::try_current() + .map(|h| (None, h)) + .unwrap_or_else(|_| { + let rt = proxmox_async::runtime::get_runtime(); + let h = rt.handle().clone(); + (Some(rt), h) + }); let writer_pool = ParallelHandler::new( "tape restore chunk writer", 4, @@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>( chunk.decode(None, Some(&digest))?; // verify digest } - datastore.insert_chunk(&chunk, &digest, &backend)?; + handle.block_on(datastore.insert_chunk(chunk, digest, &backend))?; } else if verbose { info!("Found existing chunk: {}", hex::encode(digest)); } diff --git a/src/server/pull.rs b/src/server/pull.rs index e86a9c329..065b7617c 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -206,13 +206,21 @@ async fn pull_index_chunks( let target2 = target.clone(); let backend = backend.clone(); + + let (_rt, handle) = tokio::runtime::Handle::try_current() + .map(|h| (None, h)) + .unwrap_or_else(|_| { + let rt = proxmox_async::runtime::get_runtime(); + let h = rt.handle().clone(); + (Some(rt), h) + }); let verify_pool = ParallelHandler::new( "sync chunk writer", 4, move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { // println!("verify and write {}", hex::encode(&digest)); chunk.verify_unencrypted(size as usize, &digest)?; - target2.insert_chunk(&chunk, &digest, &backend)?; + handle.block_on(target2.insert_chunk(chunk, digest, &backend))?; Ok(()) }, ); -- 2.47.3