From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id B44C51FF13A for ; Wed, 27 May 2026 14:41:38 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id F23561BB1B; Wed, 27 May 2026 14:41:36 +0200 (CEST) Message-ID: <78fbaa16-bf38-4ab5-a5ab-2986bef030bb@proxmox.com> Date: Wed, 27 May 2026 14:40:59 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Subject: Re: [PATCH v1 proxmox-backup] datastore: make insert_chunk async to avoid running out of workers To: Christian Ebner , pbs-devel@lists.proxmox.com References: <20260526172032.606994-1-r.obkircher@proxmox.com> Content-Language: en-US, de-AT From: Robert Obkircher In-Reply-To: Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1779885634037 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.055 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: 4IEZYEX6XSIJV2AASMBGOZNP65BIQ577 X-Message-ID-Hash: 4IEZYEX6XSIJV2AASMBGOZNP65BIQ577 X-MailFrom: r.obkircher@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On 27.05.26 11:40, Christian Ebner wrote: > Thanks a lot for debugging and tacking this so quickly! > > As already noted by yourself and discussed off-list, there are > additional places interacting with the chunk store and cache which > would require wrapping by a spawn_blocking() or implemented as async. > > One major concern here is the regression potential for both S3 and > regular datastores, especially with respect to performance. So it > would be great to have some before/after benchmarks as well. > > Give the scope, we might consider the alternative implementation > using block_in_place() as bugfix for the time being, the code paths > for regular datastores being battle proved for years now.  I've realized that the alternative implementation can also use spawn_blocking and sent a new patch: https://lore.proxmox.com/pbs-devel/20260527123809.238964-1-r.obkircher@proxmox.com/T/#u > > On 5/26/26 7:21 PM, Robert Obkircher wrote: >> Avoid async-sync-async transitions and wrap synchronous operations in >> spawn_blocking. >> >> Previously, every s3 upload would go through proxmox_async::block_on >> which has a waker implementation that relies on std::thread::park and >> can thus completely block an entire tokio worker. In the backtrace >> from the linked forum thread this was the case for 19 threads, while >> the remaining two were presumably trying to insert duplicate chunks >> and thus waiting on chunk locks with a 3-hour timeout. That left no >> workers to make progress on uploads or respond to new requests. >> >> An alternative approach would be to simply use block_in_place in >> upload_to_backend. However, that could still run out of workers if the >> spawn_blocking pool was exhausted and would continue to create a new >> runtime for every S3 upload from a ParallelHandler. >> >> Fixes: https://forum.proxmox.com/threads/183705 >> Signed-off-by: Robert Obkircher >> --- >>   pbs-datastore/src/datastore.rs  | 78 >> ++++++++++++++++++--------------- >>   src/api2/backup/upload_chunk.rs | 12 +++-- >>   src/api2/tape/restore.rs        | 20 ++++++++- >>   src/server/pull.rs              | 10 ++++- >>   4 files changed, 78 insertions(+), 42 deletions(-) >> >> diff --git a/pbs-datastore/src/datastore.rs >> b/pbs-datastore/src/datastore.rs >> index e2d1ae67c..f936538ba 100644 >> --- a/pbs-datastore/src/datastore.rs >> +++ b/pbs-datastore/src/datastore.rs >> @@ -14,6 +14,7 @@ use hyper::body::Bytes; >>   use nix::unistd::{UnlinkatFlags, unlinkat}; >>   use pbs_tools::lru_cache::LruCache; >>   use tokio::io::AsyncWriteExt; >> +use tokio::task::spawn_blocking; >>   use tracing::{info, warn}; >>     use proxmox_human_byte::HumanByte; >> @@ -2731,15 +2732,20 @@ impl DataStore { >>       /// The backend is passed along in order to reuse an active >> connection to the backend, created >>       /// on environment instantiation, e.g. an s3 client which >> lives for the whole backup session. >>       /// This calls into async code, so callers must assure to >> never hold a Mutex lock. >> -    pub fn insert_chunk( >> +    pub async fn insert_chunk( >>           &self, >> -        chunk: &DataBlob, >> -        digest: &[u8; 32], >> +        chunk: DataBlob, >> +        digest: [u8; 32], >>           backend: &DatastoreBackend, >>       ) -> Result<(bool, u64), Error> { >>           match backend { >> -            DatastoreBackend::Filesystem => >> self.inner.chunk_store.insert_chunk(chunk, digest), >> -            DatastoreBackend::S3(s3_client) => >> self.insert_chunk_cached(chunk, digest, s3_client), >> +            DatastoreBackend::Filesystem => { >> +                let chunk_store = self.inner.chunk_store.clone(); >> +                spawn_blocking(move || >> chunk_store.insert_chunk(&chunk, &digest)).await? >> +            } >> +            DatastoreBackend::S3(s3_client) => { >> +                self.insert_chunk_cached(chunk, digest, >> s3_client).await >> +            } >>           } >>       } >>   @@ -2752,53 +2758,55 @@ impl DataStore { >>       /// >>       /// FIXME: refactor into insert_chunk() once the backend >> instance is cacheable on the datastore >>       /// itself. >> -    pub fn insert_chunk_no_cache( >> +    pub async fn insert_chunk_no_cache( >>           &self, >> -        chunk: &DataBlob, >> -        digest: &[u8; 32], >> +        chunk: DataBlob, >> +        digest: [u8; 32], >>           backend: &DatastoreBackend, >>       ) -> Result<(bool, u64), Error> { >> +        let chunk_store = self.inner.chunk_store.clone(); >>           match backend { >> -            DatastoreBackend::Filesystem => >> self.inner.chunk_store.insert_chunk(chunk, digest), >> +            DatastoreBackend::Filesystem => { >> +                spawn_blocking(move || >> chunk_store.insert_chunk(&chunk, &digest)).await? >> +            } >>               DatastoreBackend::S3(s3_client) => { >> -                let _chunk_guard = self >> -                    .inner >> -                    .chunk_store >> -                    .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?; >> -                let chunk_data: Bytes = >> chunk.raw_data().to_vec().into(); >> +                let _chunk_guard = >> +                    spawn_blocking(move || >> chunk_store.lock_chunk(&digest, CHUNK_LOCK_TIMEOUT)) >> +                        .await??; >> +                let chunk_data: Bytes = chunk.into_inner().into(); >>                   let chunk_size = chunk_data.len() as u64; >> -                let object_key = >> crate::s3::object_key_from_digest(digest)?; >> -                let is_duplicate = proxmox_async::runtime::block_on( >> -                    >> s3_client.upload_no_replace_with_retry(object_key, chunk_data), >> -                ) >> -                .map_err(|err| format_err!("failed to upload chunk >> to s3 backend - {err:#}"))?; >> +                let object_key = >> crate::s3::object_key_from_digest(&digest)?; >> +                let is_duplicate = s3_client >> +                    .upload_no_replace_with_retry(object_key, >> chunk_data) >> +                    .await >> +                    .map_err(|err| format_err!("failed to upload >> chunk to s3 backend - {err:#}"))?; >>                   Ok((is_duplicate, chunk_size)) >>               } >>           } >>       } >>   -    fn insert_chunk_cached( >> +    async fn insert_chunk_cached( >>           &self, >> -        chunk: &DataBlob, >> -        digest: &[u8; 32], >> +        chunk: DataBlob, >> +        digest: [u8; 32], >>           s3_client: &Arc, >>       ) -> Result<(bool, u64), Error> { >>           let chunk_data = chunk.raw_data(); >>           let chunk_size = chunk_data.len() as u64; >> -        let _chunk_guard = self >> -            .inner >> -            .chunk_store >> -            .lock_chunk(digest, CHUNK_LOCK_TIMEOUT)?; >> + >> +        let chunk_store = self.inner.chunk_store.clone(); >> +        let _chunk_guard = >> +            spawn_blocking(move || chunk_store.lock_chunk(&digest, >> CHUNK_LOCK_TIMEOUT)).await??; >>             // Avoid re-upload to S3 if the chunk is either present >> in the in-memory LRU cache >>           // or the chunk marker file exists on filesystem. The >> latter means the chunk has >>           // been uploaded in the past, but was evicted from the >> LRU cache since but was not >>           // cleaned up by garbage collection, so contained in the >> S3 object store. >> -        if self.cache_contains(digest) { >> +        if self.cache_contains(&digest) { >>               tracing::info!("Skip upload of cached chunk {}", >> hex::encode(digest)); >>               return Ok((true, chunk_size)); >>           } >> -        if let Ok(true) = self.cond_touch_chunk(digest, false) { >> +        if let Ok(true) = self.cond_touch_chunk(&digest, false) { > > blocking code path > >>               tracing::info!( >>                   "Skip upload of already encountered chunk {}", >>                   hex::encode(digest), >> @@ -2807,15 +2815,15 @@ impl DataStore { >>           } >>             tracing::info!("Upload new chunk {}", >> hex::encode(digest)); >> -        let object_key = crate::s3::object_key_from_digest(digest)?; >> +        let object_key = crate::s3::object_key_from_digest(&digest)?; >>           let chunk_data: Bytes = chunk_data.to_vec().into(); >> -        let is_duplicate = proxmox_async::runtime::block_on( >> -            s3_client.upload_no_replace_with_retry(object_key, >> chunk_data), >> -        ) >> -        .map_err(|err| format_err!("failed to upload chunk to s3 >> backend - {err:#}"))?; >> +        let is_duplicate = s3_client >> +            .upload_no_replace_with_retry(object_key, chunk_data) >> +            .await >> +            .map_err(|err| format_err!("failed to upload chunk to >> s3 backend - {err:#}"))?; >>           tracing::info!("Caching of chunk {}", hex::encode(digest)); >> -        self.cache_insert(digest, chunk)?; >> -        self.inner.chunk_store.clear_chunk_expected_mark(digest)?; >> +        self.cache_insert(&digest, &chunk)?; >> +        self.inner.chunk_store.clear_chunk_expected_mark(&digest)?; > > two additional blocking code paths > >>           Ok((is_duplicate, chunk_size)) >>       } >>   diff --git a/src/api2/backup/upload_chunk.rs >> b/src/api2/backup/upload_chunk.rs >> index 59e4caee2..3c721b1f6 100644 >> --- a/src/api2/backup/upload_chunk.rs >> +++ b/src/api2/backup/upload_chunk.rs >> @@ -233,13 +233,17 @@ async fn upload_to_backend( >>           UploadChunk::new(BodyDataStream::new(req_body), digest, >> size, encoded_size).await?; >>         if env.no_cache { >> -        let (is_duplicate, chunk_size) = >> -            env.datastore >> -                .insert_chunk_no_cache(&chunk, &digest, >> &env.backend)?; >> +        let (is_duplicate, chunk_size) = env >> +            .datastore >> +            .insert_chunk_no_cache(chunk, digest, &env.backend) >> +            .await?; >>           return Ok((digest, size, chunk_size as u32, is_duplicate)); >>       } >>   -    let (is_duplicate, chunk_size) = >> env.datastore.insert_chunk(&chunk, &digest, &env.backend)?; >> +    let (is_duplicate, chunk_size) = env >> +        .datastore >> +        .insert_chunk(chunk, digest, &env.backend) >> +        .await?; >>       Ok((digest, size, chunk_size as u32, is_duplicate)) >>   } >>   diff --git a/src/api2/tape/restore.rs b/src/api2/tape/restore.rs >> index 1384fa90e..9593e1b9f 100644 >> --- a/src/api2/tape/restore.rs >> +++ b/src/api2/tape/restore.rs >> @@ -1152,6 +1152,14 @@ fn restore_partial_chunk_archive<'a>( >>       let bytes2 = bytes.clone(); >>         let backend = datastore.backend()?; >> + >> +    let (_rt, handle) = tokio::runtime::Handle::try_current() >> +        .map(|h| (None, h)) >> +        .unwrap_or_else(|_| { >> +            let rt = proxmox_async::runtime::get_runtime(); >> +            let h = rt.handle().clone(); >> +            (Some(rt), h) >> +        }); > > this should better live as dedicated implementation in > proxmox_async, so it can also be reused ... > >>       let writer_pool = ParallelHandler::new( >>           "tape restore chunk writer", >>           4, >> @@ -1163,7 +1171,7 @@ fn restore_partial_chunk_archive<'a>( >>                       chunk.decode(None, Some(&digest))?; // verify >> digest >>                   } >>   -                datastore.insert_chunk(&chunk, &digest, &backend)?; >> +                handle.block_on(datastore.insert_chunk(chunk, >> digest, &backend))?; >>               } >>               Ok(()) >>           }, >> @@ -1546,6 +1554,14 @@ fn restore_chunk_archive<'a>( >>       let bytes2 = bytes.clone(); >>         let backend = datastore.backend()?; >> + >> +    let (_rt, handle) = tokio::runtime::Handle::try_current() > > nit: maybe let's call this runtime_handle or rt_handle, just to keep > some mental context for the block_on() call site. > >> +        .map(|h| (None, h)) >> +        .unwrap_or_else(|_| { >> +            let rt = proxmox_async::runtime::get_runtime(); >> +            let h = rt.handle().clone(); >> +            (Some(rt), h) >> +        }); > > ... here and ... > >>       let writer_pool = ParallelHandler::new( >>           "tape restore chunk writer", >>           4, >> @@ -1562,7 +1578,7 @@ fn restore_chunk_archive<'a>( >>                       chunk.decode(None, Some(&digest))?; // verify >> digest >>                   } >>   -                datastore.insert_chunk(&chunk, &digest, &backend)?; >> +                handle.block_on(datastore.insert_chunk(chunk, >> digest, &backend))?; >>               } else if verbose { >>                   info!("Found existing chunk: {}", >> hex::encode(digest)); >>               } >> diff --git a/src/server/pull.rs b/src/server/pull.rs >> index e86a9c329..065b7617c 100644 >> --- a/src/server/pull.rs >> +++ b/src/server/pull.rs >> @@ -206,13 +206,21 @@ async fn pull_index_chunks( >>         let target2 = target.clone(); >>       let backend = backend.clone(); >> + >> +    let (_rt, handle) = tokio::runtime::Handle::try_current() >> +        .map(|h| (None, h)) >> +        .unwrap_or_else(|_| { >> +            let rt = proxmox_async::runtime::get_runtime(); >> +            let h = rt.handle().clone(); >> +            (Some(rt), h) >> +        }); > > ... here. > >>       let verify_pool = ParallelHandler::new( >>           "sync chunk writer", >>           4, >>           move |(chunk, digest, size): (DataBlob, [u8; 32], u64)| { >>               // println!("verify and write {}", >> hex::encode(&digest)); >>               chunk.verify_unencrypted(size as usize, &digest)?; >> -            target2.insert_chunk(&chunk, &digest, &backend)?; >> +            handle.block_on(target2.insert_chunk(chunk, digest, >> &backend))?; >>               Ok(()) >>           }, >>       ); >