From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 3E05BD813 for ; Mon, 21 Aug 2023 13:37:53 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 206AE168D1 for ; Mon, 21 Aug 2023 13:37:53 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Mon, 21 Aug 2023 13:37:52 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D6209429EA for ; Mon, 21 Aug 2023 13:37:51 +0200 (CEST) From: Max Carrara To: pbs-devel@lists.proxmox.com Date: Mon, 21 Aug 2023 13:37:45 +0200 Message-Id: <20230821113745.813368-1-m.carrara@proxmox.com> X-Mailer: git-send-email 2.39.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.032 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox] async: runtime: Modernise module and update docs X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 21 Aug 2023 11:37:53 -0000 This commit updates all helper functions, taking into account recent developments regarding `tokio`. In particular, the `block_in_place()` and `block_on()` functions now don't panic anymore if used within the single-threaded `tokio` runtime and instead behave as expected in both runtime flavours. Furthermore, because `tokio` may add more runtime flavours in the future, all helpers will now panic if used within an unsupported runtime. This is to prevent unforeseen behavioural quirks and interactions with `tokio` internals. The above changes make `BlockingGuard` redundant; it is consequently removed. The documentation is also updated, describing the behaviour of the helper functions and the purpose of the `runtime.rs` module in more detail. Signed-off-by: Max Carrara --- proxmox-async/src/runtime.rs | 222 +++++++++++++++++++++-------------- 1 file changed, 131 insertions(+), 91 deletions(-) NOTE: This patch is a follow-up to https://lists.proxmox.com/pipermail/pbs-devel/2023-August/006477.html but its changes are different enough for it not to be considered an actual v2 anymore. diff --git a/proxmox-async/src/runtime.rs b/proxmox-async/src/runtime.rs index 0fe9fae..efc1cd8 100644 --- a/proxmox-async/src/runtime.rs +++ b/proxmox-async/src/runtime.rs @@ -1,6 +1,37 @@ //! Helpers for quirks of the current tokio runtime. +//! +//! It is preferred to use these helpers throughout our applications. +//! +//! # `tokio`, Runtime Flavors, and Panics +//! +//! Because [`tokio`] may introduce more [`RuntimeFlavor`s][RuntimeFlavor] in the future, +//! we [`panic!`] on flavors we're not (yet) explicitly supporting. +//! +//! This is done for forward-compatibility's sake in order to prevent unforeseen +//! interactions with [`tokio`], such as with [`tokio::task::block_in_place`], +//! which [`panic!`s][panic!] *only* if called within a [`CurrentThread`][ct-rt]-flavored +//! runtime, but not in a [`MultiThread`][mt-rt]-flavored runtime or if there's +//! *no runtime* at all. +//! +//! All [`panic!`s][panic!] can otherwise be either avoided or caught early by instantiating +//! your runtime with [`get_runtime()`] or [`get_runtime_with_builder()`]. Or, if you're +//! creating a separate async application, use [`main()`] for convenience. +//! +//! ## Supported [`RuntimeFlavor`s][RuntimeFlavor] +//! +//! * [`RuntimeFlavor::MultiThread`] +//! * [`RuntimeFlavor::CurrentThread`] +//! +//! # [`tokio`] and OpenSSL +//! +//! There's a nasty [OpenSSL bug][openssl-bug] causing a race between OpenSSL clean-up handlers +//! and the [`tokio`] runtime. This however is handled by [`get_runtime_with_builder()`] +//! and thus also within [`get_runtime()`] and our [`main()`] wrapper. +//! +//! [ct-rt]: RuntimeFlavor::CurrentThread +//! [mt-rt]: RuntimeFlavor::MultiThread +//! [openssl-bug]: https://github.com/openssl/openssl/issues/6214 -use std::cell::RefCell; use std::future::Future; use std::sync::{Arc, Mutex, Weak}; use std::task::{Context, Poll, Waker}; @@ -8,39 +39,7 @@ use std::thread::{self, Thread}; use lazy_static::lazy_static; use pin_utils::pin_mut; -use tokio::runtime::{self, Runtime}; - -thread_local! { - static BLOCKING: RefCell = RefCell::new(false); -} - -fn is_in_tokio() -> bool { - tokio::runtime::Handle::try_current().is_ok() -} - -fn is_blocking() -> bool { - BLOCKING.with(|v| *v.borrow()) -} - -struct BlockingGuard(bool); - -impl BlockingGuard { - fn set() -> Self { - Self(BLOCKING.with(|v| { - let old = *v.borrow(); - *v.borrow_mut() = true; - old - })) - } -} - -impl Drop for BlockingGuard { - fn drop(&mut self) { - BLOCKING.with(|v| { - *v.borrow_mut() = self.0; - }); - } -} +use tokio::runtime::{self, Runtime, RuntimeFlavor}; lazy_static! { // avoid openssl bug: https://github.com/openssl/openssl/issues/6214 @@ -53,14 +52,28 @@ extern "C" { fn OPENSSL_thread_stop(); } -/// Get or create the current main tokio runtime. +#[inline] +fn panic_on_bad_flavor(runtime: &runtime::Runtime) { + match runtime.handle().runtime_flavor() { + RuntimeFlavor::CurrentThread => (), + RuntimeFlavor::MultiThread => (), + bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor), + } +} + +/// Get or build the current main [`tokio`] [`Runtime`]. Useful if [`tokio`'s][tokio] defaults +/// don't suit your needs. /// -/// This makes sure that tokio's worker threads are marked for us so that we know whether we -/// can/need to use `block_in_place` in our `block_on` helper. +/// # Panics +/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`]. +/// See the [module level][mod] documentation for more details. +/// +/// [mod]: self pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> Arc { let mut guard = RUNTIME.lock().unwrap(); if let Some(rt) = guard.upgrade() { + panic_on_bad_flavor(&rt); return rt; } @@ -74,6 +87,8 @@ pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> }); let runtime = builder.build().expect("failed to spawn tokio runtime"); + panic_on_bad_flavor(&runtime); + let rt = Arc::new(runtime); *guard = Arc::downgrade(&rt); @@ -81,9 +96,12 @@ pub fn get_runtime_with_builder runtime::Builder>(get_builder: F) -> rt } -/// Get or create the current main tokio runtime. +/// Get or create the current main [`tokio`] [`Runtime`]. +/// +/// This is a convenience wrapper around [`get_runtime_with_builder()`] using +/// [`tokio`'s multithreaded runtime][mt-rt-meth]. /// -/// This calls get_runtime_with_builder() using the tokio default threaded scheduler +/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread() pub fn get_runtime() -> Arc { get_runtime_with_builder(|| { let mut builder = runtime::Builder::new_multi_thread(); @@ -93,67 +111,89 @@ pub fn get_runtime() -> Arc { } /// Block on a synchronous piece of code. -pub fn block_in_place(fut: impl FnOnce() -> R) -> R { - // don't double-exit the context (tokio doesn't like that) - // also, if we're not actually in a tokio-worker we must not use block_in_place() either - if is_blocking() || !is_in_tokio() { - fut() +/// +/// This is a wrapper around [`tokio::task::block_in_place()`] that allows to +/// block the current thread even within a [`Runtime`] with [`RuntimeFlavor::CurrentThread`]. +/// +/// Normally, [tokio's `block_in_place()`][bip] [`panic`s][panic] when called in +/// such a case; this function instead just runs the piece of code right away, preventing +/// an unforeseen panic. +/// +/// # Note +/// If you're in a [`CurrentThread`][RuntimeFlavor::CurrentThread] runtime and you +/// *really* need to execute a bunch of blocking code, you might want to consider +/// executing that code with [`tokio::task::spawn_blocking()`] instead. This prevents +/// blocking the single-threaded runtime and still allows you to communicate via channels. +/// +/// See [tokio's documentation on CPU-bound tasks and blocking code][tok-block-doc] +/// for more information. +/// +/// # Panics +/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`]. +/// See the [module level][mod] documentation for more details. +/// +/// [bip]: tokio::task::block_in_place() +/// [mod]: self +/// [sp]: tokio::task::spawn_blocking() +/// [tok-block-doc]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code +pub fn block_in_place(func: impl FnOnce() -> R) -> R { + if let Ok(runtime) = runtime::Handle::try_current() { + match runtime.runtime_flavor() { + RuntimeFlavor::CurrentThread => func(), + RuntimeFlavor::MultiThread => tokio::task::block_in_place(func), + bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor), + } } else { - // we are in an actual tokio worker thread, block it: - tokio::task::block_in_place(move || { - let _guard = BlockingGuard::set(); - fut() - }) + func() } } -/// Block on a future in this thread. -pub fn block_on(fut: F) -> F::Output { - // don't double-exit the context (tokio doesn't like that) - if is_blocking() { - block_on_local_future(fut) - } else if is_in_tokio() { - // inside a tokio worker we need to tell tokio that we're about to really block: - tokio::task::block_in_place(move || { - let _guard = BlockingGuard::set(); - block_on_local_future(fut) - }) +/// Block on a future in the current thread. +/// +/// Not to be confused with [`tokio::runtime::Handle::block_on()`] and +/// [`tokio::runtime::Runtime::block_on()`]. +/// +/// This will prevent other futures from running in the current thread in the meantime. +/// Essentially, this is [`block_in_place()`], but for [`Future`s][Future] instead of functions. +/// +/// If there's no runtime currently active, this function will create a temporary one +/// using [`get_runtime()`] in order to block on and finish running the provided [`Future`]. +/// +/// # Panics +/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`]. +/// See the [module level][mod] documentation for more details. +/// +/// [mod]: self +pub fn block_on(future: F) -> F::Output { + if let Ok(runtime) = runtime::Handle::try_current() { + match runtime.runtime_flavor() { + RuntimeFlavor::CurrentThread => block_on_local_future(future), + RuntimeFlavor::MultiThread => { + tokio::task::block_in_place(move || block_on_local_future(future)) + } + bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor), + } } else { - // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn - // it on demand if necessary), then enter it - let _guard = BlockingGuard::set(); - let _enter_guard = get_runtime().enter(); - get_runtime().block_on(fut) - } -} - -/* -fn block_on_impl(mut fut: F) -> F::Output -where - F: Future + Send, - F::Output: Send + 'static, -{ - let (tx, rx) = tokio::sync::oneshot::channel(); - let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static - tokio::spawn(async move { - let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) }; - tx - .send(fut.await) - .map_err(drop) - .expect("failed to send block_on result to channel") - }); + let runtime = get_runtime(); + let _enter_guard = runtime.enter(); - futures::executor::block_on(async move { - rx.await.expect("failed to receive block_on result from channel") - }) - std::mem::forget(fut); + runtime.block_on(future) + } } -*/ -/// This used to be our tokio main entry point. Now this just calls out to `block_on` for -/// compatibility, which will perform all the necessary tasks on-demand anyway. +/// This is our [`tokio`] entrypoint, which blocks on the provided [`Future`] +/// until it's completed, using [`tokio`'s multithreaded runtime][mt-rt-meth]. +/// +/// It is preferred to use this function over other ways of instantiating a runtime. +/// See the [module level][mod] documentation for more information. +/// +/// [mod]: self +/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread() pub fn main(fut: F) -> F::Output { - block_on(fut) + let runtime = get_runtime(); + let _enter_guard = runtime.enter(); + + runtime.block_on(fut) } struct ThreadWaker(Thread); -- 2.39.2