all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox] async: runtime: Modernise module and update docs
@ 2023-08-21 11:37 Max Carrara
  2023-08-23  7:58 ` [pbs-devel] applied: " Wolfgang Bumiller
  0 siblings, 1 reply; 2+ messages in thread
From: Max Carrara @ 2023-08-21 11:37 UTC (permalink / raw)
  To: pbs-devel

This commit updates all helper functions, taking into account recent
developments regarding `tokio`.

In particular, the `block_in_place()` and `block_on()` functions now
don't panic anymore if used within the single-threaded `tokio` runtime
and instead behave as expected in both runtime flavours.

Furthermore, because `tokio` may add more runtime flavours in the
future, all helpers will now panic if used within an unsupported
runtime. This is to prevent unforeseen behavioural quirks and
interactions with `tokio` internals.

The above changes make `BlockingGuard` redundant; it is consequently
removed.

The documentation is also updated, describing the behaviour of the
helper functions and the purpose of the `runtime.rs` module in more
detail.

Signed-off-by: Max Carrara <m.carrara@proxmox.com>
---
 proxmox-async/src/runtime.rs | 222 +++++++++++++++++++++--------------
 1 file changed, 131 insertions(+), 91 deletions(-)

 NOTE: This patch is a follow-up to https://lists.proxmox.com/pipermail/pbs-devel/2023-August/006477.html
 but its changes are different enough for it not to be considered an
 actual v2 anymore.

diff --git a/proxmox-async/src/runtime.rs b/proxmox-async/src/runtime.rs
index 0fe9fae..efc1cd8 100644
--- a/proxmox-async/src/runtime.rs
+++ b/proxmox-async/src/runtime.rs
@@ -1,6 +1,37 @@
 //! Helpers for quirks of the current tokio runtime.
+//!
+//! It is preferred to use these helpers throughout our applications.
+//!
+//! # `tokio`, Runtime Flavors, and Panics
+//!
+//! Because [`tokio`] may introduce more [`RuntimeFlavor`s][RuntimeFlavor] in the future,
+//! we [`panic!`] on flavors we're not (yet) explicitly supporting.
+//!
+//! This is done for forward-compatibility's sake in order to prevent unforeseen
+//! interactions with [`tokio`], such as with [`tokio::task::block_in_place`],
+//! which [`panic!`s][panic!] *only* if called within a [`CurrentThread`][ct-rt]-flavored
+//! runtime, but not in a [`MultiThread`][mt-rt]-flavored runtime or if there's
+//! *no runtime* at all.
+//!
+//! All [`panic!`s][panic!] can otherwise be either avoided or caught early by instantiating
+//! your runtime with [`get_runtime()`] or [`get_runtime_with_builder()`]. Or, if you're
+//! creating a separate async application, use [`main()`] for convenience.
+//!
+//! ## Supported [`RuntimeFlavor`s][RuntimeFlavor]
+//!
+//! * [`RuntimeFlavor::MultiThread`]
+//! * [`RuntimeFlavor::CurrentThread`]
+//!
+//! # [`tokio`] and OpenSSL
+//!
+//! There's a nasty [OpenSSL bug][openssl-bug] causing a race between OpenSSL clean-up handlers
+//! and the [`tokio`] runtime. This however is handled by [`get_runtime_with_builder()`]
+//! and thus also within [`get_runtime()`] and our [`main()`] wrapper.
+//!
+//! [ct-rt]: RuntimeFlavor::CurrentThread
+//! [mt-rt]: RuntimeFlavor::MultiThread
+//! [openssl-bug]: https://github.com/openssl/openssl/issues/6214

-use std::cell::RefCell;
 use std::future::Future;
 use std::sync::{Arc, Mutex, Weak};
 use std::task::{Context, Poll, Waker};
@@ -8,39 +39,7 @@ use std::thread::{self, Thread};

 use lazy_static::lazy_static;
 use pin_utils::pin_mut;
-use tokio::runtime::{self, Runtime};
-
-thread_local! {
-    static BLOCKING: RefCell<bool> = RefCell::new(false);
-}
-
-fn is_in_tokio() -> bool {
-    tokio::runtime::Handle::try_current().is_ok()
-}
-
-fn is_blocking() -> bool {
-    BLOCKING.with(|v| *v.borrow())
-}
-
-struct BlockingGuard(bool);
-
-impl BlockingGuard {
-    fn set() -> Self {
-        Self(BLOCKING.with(|v| {
-            let old = *v.borrow();
-            *v.borrow_mut() = true;
-            old
-        }))
-    }
-}
-
-impl Drop for BlockingGuard {
-    fn drop(&mut self) {
-        BLOCKING.with(|v| {
-            *v.borrow_mut() = self.0;
-        });
-    }
-}
+use tokio::runtime::{self, Runtime, RuntimeFlavor};

 lazy_static! {
     // avoid openssl bug: https://github.com/openssl/openssl/issues/6214
@@ -53,14 +52,28 @@ extern "C" {
     fn OPENSSL_thread_stop();
 }

-/// Get or create the current main tokio runtime.
+#[inline]
+fn panic_on_bad_flavor(runtime: &runtime::Runtime) {
+    match runtime.handle().runtime_flavor() {
+        RuntimeFlavor::CurrentThread => (),
+        RuntimeFlavor::MultiThread => (),
+        bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+    }
+}
+
+/// Get or build the current main [`tokio`] [`Runtime`]. Useful if [`tokio`'s][tokio] defaults
+/// don't suit your needs.
 ///
-/// This makes sure that tokio's worker threads are marked for us so that we know whether we
-/// can/need to use `block_in_place` in our `block_on` helper.
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [mod]: self
 pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) -> Arc<Runtime> {
     let mut guard = RUNTIME.lock().unwrap();

     if let Some(rt) = guard.upgrade() {
+        panic_on_bad_flavor(&rt);
         return rt;
     }

@@ -74,6 +87,8 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
     });

     let runtime = builder.build().expect("failed to spawn tokio runtime");
+    panic_on_bad_flavor(&runtime);
+
     let rt = Arc::new(runtime);

     *guard = Arc::downgrade(&rt);
@@ -81,9 +96,12 @@ pub fn get_runtime_with_builder<F: Fn() -> runtime::Builder>(get_builder: F) ->
     rt
 }

-/// Get or create the current main tokio runtime.
+/// Get or create the current main [`tokio`] [`Runtime`].
+///
+/// This is a convenience wrapper around [`get_runtime_with_builder()`] using
+/// [`tokio`'s multithreaded runtime][mt-rt-meth].
 ///
-/// This calls get_runtime_with_builder() using the tokio default threaded scheduler
+/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
 pub fn get_runtime() -> Arc<Runtime> {
     get_runtime_with_builder(|| {
         let mut builder = runtime::Builder::new_multi_thread();
@@ -93,67 +111,89 @@ pub fn get_runtime() -> Arc<Runtime> {
 }

 /// Block on a synchronous piece of code.
-pub fn block_in_place<R>(fut: impl FnOnce() -> R) -> R {
-    // don't double-exit the context (tokio doesn't like that)
-    // also, if we're not actually in a tokio-worker we must not use block_in_place() either
-    if is_blocking() || !is_in_tokio() {
-        fut()
+///
+/// This is a wrapper around [`tokio::task::block_in_place()`] that allows to
+/// block the current thread even within a [`Runtime`] with [`RuntimeFlavor::CurrentThread`].
+///
+/// Normally, [tokio's `block_in_place()`][bip] [`panic`s][panic] when called in
+/// such a case; this function instead just runs the piece of code right away, preventing
+/// an unforeseen panic.
+///
+/// # Note
+/// If you're in a [`CurrentThread`][RuntimeFlavor::CurrentThread] runtime and you
+/// *really* need to execute a bunch of blocking code, you might want to consider
+/// executing that code with [`tokio::task::spawn_blocking()`] instead. This prevents
+/// blocking the single-threaded runtime and still allows you to communicate via channels.
+///
+/// See [tokio's documentation on CPU-bound tasks and blocking code][tok-block-doc]
+/// for more information.
+///
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [bip]: tokio::task::block_in_place()
+/// [mod]: self
+/// [sp]: tokio::task::spawn_blocking()
+/// [tok-block-doc]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
+pub fn block_in_place<R>(func: impl FnOnce() -> R) -> R {
+    if let Ok(runtime) = runtime::Handle::try_current() {
+        match runtime.runtime_flavor() {
+            RuntimeFlavor::CurrentThread => func(),
+            RuntimeFlavor::MultiThread => tokio::task::block_in_place(func),
+            bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+        }
     } else {
-        // we are in an actual tokio worker thread, block it:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            fut()
-        })
+        func()
     }
 }

-/// Block on a future in this thread.
-pub fn block_on<F: Future>(fut: F) -> F::Output {
-    // don't double-exit the context (tokio doesn't like that)
-    if is_blocking() {
-        block_on_local_future(fut)
-    } else if is_in_tokio() {
-        // inside a tokio worker we need to tell tokio that we're about to really block:
-        tokio::task::block_in_place(move || {
-            let _guard = BlockingGuard::set();
-            block_on_local_future(fut)
-        })
+/// Block on a future in the current thread.
+///
+/// Not to be confused with [`tokio::runtime::Handle::block_on()`] and
+/// [`tokio::runtime::Runtime::block_on()`].
+///
+/// This will prevent other futures from running in the current thread in the meantime.
+/// Essentially, this is [`block_in_place()`], but for [`Future`s][Future] instead of functions.
+///
+/// If there's no runtime currently active, this function will create a temporary one
+/// using [`get_runtime()`] in order to block on and finish running the provided [`Future`].
+///
+/// # Panics
+/// This function will panic if the runtime has an unsupported [`RuntimeFlavor`].
+/// See the [module level][mod] documentation for more details.
+///
+/// [mod]: self
+pub fn block_on<F: Future>(future: F) -> F::Output {
+    if let Ok(runtime) = runtime::Handle::try_current() {
+        match runtime.runtime_flavor() {
+            RuntimeFlavor::CurrentThread => block_on_local_future(future),
+            RuntimeFlavor::MultiThread => {
+                tokio::task::block_in_place(move || block_on_local_future(future))
+            }
+            bad_flavor => panic!("unsupported tokio runtime flavor: \"{:#?}\"", bad_flavor),
+        }
     } else {
-        // not a worker thread, not associated with a runtime, make sure we have a runtime (spawn
-        // it on demand if necessary), then enter it
-        let _guard = BlockingGuard::set();
-        let _enter_guard = get_runtime().enter();
-        get_runtime().block_on(fut)
-    }
-}
-
-/*
-fn block_on_impl<F>(mut fut: F) -> F::Output
-where
-    F: Future + Send,
-    F::Output: Send + 'static,
-{
-    let (tx, rx) = tokio::sync::oneshot::channel();
-    let fut_ptr = &mut fut as *mut F as usize; // hack to not require F to be 'static
-    tokio::spawn(async move {
-        let fut: F = unsafe { std::ptr::read(fut_ptr as *mut F) };
-        tx
-            .send(fut.await)
-            .map_err(drop)
-            .expect("failed to send block_on result to channel")
-    });
+        let runtime = get_runtime();
+        let _enter_guard = runtime.enter();

-    futures::executor::block_on(async move {
-        rx.await.expect("failed to receive block_on result from channel")
-    })
-    std::mem::forget(fut);
+        runtime.block_on(future)
+    }
 }
-*/

-/// This used to be our tokio main entry point. Now this just calls out to `block_on` for
-/// compatibility, which will perform all the necessary tasks on-demand anyway.
+/// This is our [`tokio`] entrypoint, which blocks on the provided [`Future`]
+/// until it's completed, using [`tokio`'s multithreaded runtime][mt-rt-meth].
+///
+/// It is preferred to use this function over other ways of instantiating a runtime.
+/// See the [module level][mod] documentation for more information.
+///
+/// [mod]: self
+/// [mt-rt-meth]: tokio::runtime::Builder::new_multi_thread()
 pub fn main<F: Future>(fut: F) -> F::Output {
-    block_on(fut)
+    let runtime = get_runtime();
+    let _enter_guard = runtime.enter();
+
+    runtime.block_on(fut)
 }

 struct ThreadWaker(Thread);
--
2.39.2





^ permalink raw reply	[flat|nested] 2+ messages in thread

* [pbs-devel] applied: [PATCH proxmox] async: runtime: Modernise module and update docs
  2023-08-21 11:37 [pbs-devel] [PATCH proxmox] async: runtime: Modernise module and update docs Max Carrara
@ 2023-08-23  7:58 ` Wolfgang Bumiller
  0 siblings, 0 replies; 2+ messages in thread
From: Wolfgang Bumiller @ 2023-08-23  7:58 UTC (permalink / raw)
  To: Max Carrara; +Cc: pbs-devel

applied, thanks




^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2023-08-23  7:59 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-08-21 11:37 [pbs-devel] [PATCH proxmox] async: runtime: Modernise module and update docs Max Carrara
2023-08-23  7:58 ` [pbs-devel] applied: " Wolfgang Bumiller

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal