all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
@ 2021-01-26 10:17 Dominik Csapak
  2021-01-29  6:05 ` Dietmar Maurer
  0 siblings, 1 reply; 4+ messages in thread
From: Dominik Csapak @ 2021-01-26 10:17 UTC (permalink / raw)
  To: pbs-devel

this way, the code is much more readable

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
changes from v1:
* rebased on master

 src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
 1 file changed, 31 insertions(+), 37 deletions(-)

diff --git a/src/api2/reader.rs b/src/api2/reader.rs
index 43d832ce..ae936184 100644
--- a/src/api2/reader.rs
+++ b/src/api2/reader.rs
@@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+            let _guard = _guard;
+
             let mut env = ReaderEnvironment::new(
                 env_type,
                 auth_id,
@@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
 
             let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
 
-            let abort_future = worker.abort_future();
-
-            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
-                .map_err(Error::from)
-                .and_then({
-                    let env = env.clone();
-                    move |conn| {
-                        env.debug("protocol upgrade done");
-
-                        let mut http = hyper::server::conn::Http::new();
-                        http.http2_only(true);
-                        // increase window size: todo - find optiomal size
-                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
-                        http.http2_initial_stream_window_size(window_size);
-                        http.http2_initial_connection_window_size(window_size);
-                        http.http2_max_frame_size(4*1024*1024);
-
-                        http.serve_connection(conn, service)
-                            .map_err(Error::from)
-                    }
-                });
-            let abort_future = abort_future
-                .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
-
-            use futures::future::Either;
-            futures::future::select(req_fut, abort_future)
-                .map(move |res| {
-                    let _guard = _guard;
-                    match res {
-                        Either::Left((Ok(_), _)) => Ok(()),
-                        Either::Left((Err(err), _)) => Err(err),
-                        Either::Right((Ok(_), _)) => Ok(()),
-                        Either::Right((Err(err), _)) => Err(err),
-                    }
-                })
-                .map_ok(move |_| env.log("reader finished successfully"))
+            let mut abort_future = worker.abort_future()
+                .map(|_| Err(format_err!("task aborted")));
+
+            let env2 = env.clone();
+            let req_fut = async move {
+                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
+                env2.debug("protocol upgrade done");
+
+                let mut http = hyper::server::conn::Http::new();
+                http.http2_only(true);
+                // increase window size: todo - find optiomal size
+                let window_size = 32*1024*1024; // max = (1 << 31) - 2
+                http.http2_initial_stream_window_size(window_size);
+                http.http2_initial_connection_window_size(window_size);
+                http.http2_max_frame_size(4*1024*1024);
+
+                http.serve_connection(conn, service)
+                    .map_err(Error::from).await
+            };
+
+            futures::select!{
+                req = req_fut.fuse() => req?,
+                abort = abort_future => abort?,
+            };
+
+            env.log("reader finished successfully");
+
+            Ok(())
         })?;
 
         let response = Response::builder()
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-01-29  8:18 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-26 10:17 [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task Dominik Csapak
2021-01-29  6:05 ` Dietmar Maurer
2021-01-29  7:01   ` Dominik Csapak
2021-01-29  8:17     ` Dietmar Maurer

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal