all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup] api2/reader: asyncify the reader worker task
@ 2021-01-19 11:04 Dominik Csapak
  0 siblings, 0 replies; only message in thread
From: Dominik Csapak @ 2021-01-19 11:04 UTC (permalink / raw)
  To: pbs-devel

this way, the code is much more readable

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/reader.rs | 64 +++++++++++++++++++++-------------------------
 1 file changed, 29 insertions(+), 35 deletions(-)

diff --git a/src/api2/reader.rs b/src/api2/reader.rs
index 72b6e33a..224a78de 100644
--- a/src/api2/reader.rs
+++ b/src/api2/reader.rs
@@ -113,7 +113,9 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+            let _guard = _guard;
+
             let mut env = ReaderEnvironment::new(
                 env_type,
                 auth_id,
@@ -128,42 +130,34 @@ fn upgrade_to_backup_reader_protocol(
 
             let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
 
-            let abort_future = worker.abort_future();
-
-            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
-                .map_err(Error::from)
-                .and_then({
-                    let env = env.clone();
-                    move |conn| {
-                        env.debug("protocol upgrade done");
-
-                        let mut http = hyper::server::conn::Http::new();
-                        http.http2_only(true);
-                        // increase window size: todo - find optiomal size
-                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
-                        http.http2_initial_stream_window_size(window_size);
-                        http.http2_initial_connection_window_size(window_size);
-                        http.http2_max_frame_size(4*1024*1024);
-
-                        http.serve_connection(conn, service)
-                            .map_err(Error::from)
-                    }
-                });
-            let abort_future = abort_future
+            let mut abort_future = worker.abort_future()
                 .map(|_| Err(format_err!("task aborted")));
 
-            use futures::future::Either;
-            futures::future::select(req_fut, abort_future)
-                .map(move |res| {
-                    let _guard = _guard;
-                    match res {
-                        Either::Left((Ok(res), _)) => Ok(res),
-                        Either::Left((Err(err), _)) => Err(err),
-                        Either::Right((Ok(res), _)) => Ok(res),
-                        Either::Right((Err(err), _)) => Err(err),
-                    }
-                })
-                .map_ok(move |_| env.log("reader finished successfully"))
+            let env2 = env.clone();
+            let req_fut = async move {
+                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
+                env2.debug("protocol upgrade done");
+
+                let mut http = hyper::server::conn::Http::new();
+                http.http2_only(true);
+                // increase window size: todo - find optiomal size
+                let window_size = 32*1024*1024; // max = (1 << 31) - 2
+                http.http2_initial_stream_window_size(window_size);
+                http.http2_initial_connection_window_size(window_size);
+                http.http2_max_frame_size(4*1024*1024);
+
+                http.serve_connection(conn, service)
+                    .map_err(Error::from).await
+            };
+
+            futures::select!{
+                req = req_fut.fuse() => req?,
+                abort = abort_future => abort?,
+            };
+
+            env.log("reader finished successfully");
+
+            Ok(())
         })?;
 
         let response = Response::builder()
-- 
2.20.1





^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2021-01-19 11:04 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-19 11:04 [pbs-devel] [PATCH proxmox-backup] api2/reader: asyncify the reader worker task Dominik Csapak

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal