* [pbs-devel] [PATCH] use tokio::task_local to store/access the current worker
@ 2021-09-14 10:50 Dietmar Maurer
0 siblings, 0 replies; only message in thread
From: Dietmar Maurer @ 2021-09-14 10:50 UTC (permalink / raw)
To: pbs-devel
This is used to simplify logging: worker_log!() and worker_warn!()
---
src/server/worker_task.rs | 79 +++++++++++++++++++++++++++++++--------
1 file changed, 64 insertions(+), 15 deletions(-)
diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs
index 2ef8ba9d..9a0173b3 100644
--- a/src/server/worker_task.rs
+++ b/src/server/worker_task.rs
@@ -39,6 +39,10 @@ lazy_static! {
static ref WORKER_TASK_LIST: Mutex<HashMap<usize, Arc<WorkerTask>>> = Mutex::new(HashMap::new());
}
+tokio::task_local! {
+ static CURRENT_WORKER: Arc<WorkerTask>;
+}
+
/// checks if the task UPID refers to a worker from this process
fn is_local_worker(upid: &UPID) -> bool {
upid.pid == server::pid() && upid.pstart == server::pstart()
@@ -664,10 +668,10 @@ impl WorkerTask {
let worker = WorkerTask::new(worker_type, worker_id, auth_id, to_stdout)?;
let upid_str = worker.upid.to_string();
let f = f(worker.clone());
- tokio::spawn(async move {
+ tokio::spawn(CURRENT_WORKER.scope(Arc::clone(&worker), async move {
let result = f.await;
worker.log_result(&result);
- });
+ }));
Ok(upid_str)
}
@@ -686,22 +690,24 @@ impl WorkerTask {
let upid_str = worker.upid.to_string();
let _child = std::thread::Builder::new().name(upid_str.clone()).spawn(move || {
- let worker1 = worker.clone();
- let result = match std::panic::catch_unwind(move || f(worker1)) {
- Ok(r) => r,
- Err(panic) => {
- match panic.downcast::<&str>() {
- Ok(panic_msg) => {
- Err(format_err!("worker panicked: {}", panic_msg))
- }
- Err(_) => {
- Err(format_err!("worker panicked: unknown type."))
+ let worker1 = Arc::clone(&worker);
+ CURRENT_WORKER.sync_scope(Arc::clone(&worker), || {
+ let result = match std::panic::catch_unwind(move || f(worker1)) {
+ Ok(r) => r,
+ Err(panic) => {
+ match panic.downcast::<&str>() {
+ Ok(panic_msg) => {
+ Err(format_err!("worker panicked: {}", panic_msg))
+ }
+ Err(_) => {
+ Err(format_err!("worker panicked: unknown type."))
+ }
}
}
- }
- };
+ };
- worker.log_result(&result);
+ worker.log_result(&result);
+ });
});
Ok(upid_str)
@@ -841,3 +847,46 @@ pub async fn wait_for_local_worker(upid_str: &str) -> Result<(), Error> {
}
Ok(())
}
+
+/// Call worker.log() for the current worker
+///
+/// This will simply use `println()` if called from ouside a worker.
+pub fn worker_log_str<S: AsRef<str>>(msg: S) {
+ let msg: &str = msg.as_ref();
+ CURRENT_WORKER.try_with(|worker| worker.log(msg)).unwrap_or_else(|_| {
+ println!("{}", msg);
+ });
+}
+
+/// Call worker.warn() for the current worker
+///
+/// This will simply use `eprintln()` if called from ouside a worker.
+pub fn worker_warn_str<S: AsRef<str>>(msg: S) {
+ let msg: &str = msg.as_ref();
+ CURRENT_WORKER.try_with(|worker| worker.log(msg)).unwrap_or_else(|_| {
+ eprintln!("WARN: {}", msg);
+ });
+}
+
+/// Macro to call worker_log_str() with format
+#[macro_export]
+macro_rules! worker_log {
+ ($($arg:tt)*) => ({
+ $crate::server::worker_log_str(format!($($arg)*));
+ })
+}
+
+/// Macro to call worker_warn_str with format
+#[macro_export]
+macro_rules! worker_warn {
+ ($($arg:tt)*) => ({
+ $crate::server::worker_warn_str(format!($($arg)*));
+ })
+}
+
+pub fn worker_fail_on_abort() -> Result<(), Error> {
+ match CURRENT_WORKER.try_with(|worker| worker.fail_on_abort()) {
+ Ok(result) => result,
+ Err(_) => bail!("worker abort failed: not inside a worker!"),
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2021-09-14 10:50 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-09-14 10:50 [pbs-devel] [PATCH] use tokio::task_local to store/access the current worker Dietmar Maurer
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal