public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
@ 2021-01-26 10:17 Dominik Csapak
  2021-01-29  6:05 ` Dietmar Maurer
  0 siblings, 1 reply; 4+ messages in thread
From: Dominik Csapak @ 2021-01-26 10:17 UTC (permalink / raw)
  To: pbs-devel

this way, the code is much more readable

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
changes from v1:
* rebased on master

 src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
 1 file changed, 31 insertions(+), 37 deletions(-)

diff --git a/src/api2/reader.rs b/src/api2/reader.rs
index 43d832ce..ae936184 100644
--- a/src/api2/reader.rs
+++ b/src/api2/reader.rs
@@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
 
         let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
 
-        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
+        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
+            let _guard = _guard;
+
             let mut env = ReaderEnvironment::new(
                 env_type,
                 auth_id,
@@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
 
             let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
 
-            let abort_future = worker.abort_future();
-
-            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
-                .map_err(Error::from)
-                .and_then({
-                    let env = env.clone();
-                    move |conn| {
-                        env.debug("protocol upgrade done");
-
-                        let mut http = hyper::server::conn::Http::new();
-                        http.http2_only(true);
-                        // increase window size: todo - find optiomal size
-                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
-                        http.http2_initial_stream_window_size(window_size);
-                        http.http2_initial_connection_window_size(window_size);
-                        http.http2_max_frame_size(4*1024*1024);
-
-                        http.serve_connection(conn, service)
-                            .map_err(Error::from)
-                    }
-                });
-            let abort_future = abort_future
-                .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
-
-            use futures::future::Either;
-            futures::future::select(req_fut, abort_future)
-                .map(move |res| {
-                    let _guard = _guard;
-                    match res {
-                        Either::Left((Ok(_), _)) => Ok(()),
-                        Either::Left((Err(err), _)) => Err(err),
-                        Either::Right((Ok(_), _)) => Ok(()),
-                        Either::Right((Err(err), _)) => Err(err),
-                    }
-                })
-                .map_ok(move |_| env.log("reader finished successfully"))
+            let mut abort_future = worker.abort_future()
+                .map(|_| Err(format_err!("task aborted")));
+
+            let env2 = env.clone();
+            let req_fut = async move {
+                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
+                env2.debug("protocol upgrade done");
+
+                let mut http = hyper::server::conn::Http::new();
+                http.http2_only(true);
+                // increase window size: todo - find optiomal size
+                let window_size = 32*1024*1024; // max = (1 << 31) - 2
+                http.http2_initial_stream_window_size(window_size);
+                http.http2_initial_connection_window_size(window_size);
+                http.http2_max_frame_size(4*1024*1024);
+
+                http.serve_connection(conn, service)
+                    .map_err(Error::from).await
+            };
+
+            futures::select!{
+                req = req_fut.fuse() => req?,
+                abort = abort_future => abort?,
+            };
+
+            env.log("reader finished successfully");
+
+            Ok(())
         })?;
 
         let response = Response::builder()
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
  2021-01-26 10:17 [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task Dominik Csapak
@ 2021-01-29  6:05 ` Dietmar Maurer
  2021-01-29  7:01   ` Dominik Csapak
  0 siblings, 1 reply; 4+ messages in thread
From: Dietmar Maurer @ 2021-01-29  6:05 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Dominik Csapak

Why did you change the _guard lifetime?


> On 01/26/2021 11:17 AM Dominik Csapak <d.csapak@proxmox.com> wrote:
> 
>  
> this way, the code is much more readable
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> changes from v1:
> * rebased on master
> 
>  src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
>  1 file changed, 31 insertions(+), 37 deletions(-)
> 
> diff --git a/src/api2/reader.rs b/src/api2/reader.rs
> index 43d832ce..ae936184 100644
> --- a/src/api2/reader.rs
> +++ b/src/api2/reader.rs
> @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
>  
>          let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>  
> -        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
> +        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
> +            let _guard = _guard;
> +
>              let mut env = ReaderEnvironment::new(
>                  env_type,
>                  auth_id,
> @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
>  
>              let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
>  
> -            let abort_future = worker.abort_future();
> -
> -            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
> -                .map_err(Error::from)
> -                .and_then({
> -                    let env = env.clone();
> -                    move |conn| {
> -                        env.debug("protocol upgrade done");
> -
> -                        let mut http = hyper::server::conn::Http::new();
> -                        http.http2_only(true);
> -                        // increase window size: todo - find optiomal size
> -                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
> -                        http.http2_initial_stream_window_size(window_size);
> -                        http.http2_initial_connection_window_size(window_size);
> -                        http.http2_max_frame_size(4*1024*1024);
> -
> -                        http.serve_connection(conn, service)
> -                            .map_err(Error::from)
> -                    }
> -                });
> -            let abort_future = abort_future
> -                .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
> -
> -            use futures::future::Either;
> -            futures::future::select(req_fut, abort_future)
> -                .map(move |res| {
> -                    let _guard = _guard;
> -                    match res {
> -                        Either::Left((Ok(_), _)) => Ok(()),
> -                        Either::Left((Err(err), _)) => Err(err),
> -                        Either::Right((Ok(_), _)) => Ok(()),
> -                        Either::Right((Err(err), _)) => Err(err),
> -                    }
> -                })
> -                .map_ok(move |_| env.log("reader finished successfully"))
> +            let mut abort_future = worker.abort_future()
> +                .map(|_| Err(format_err!("task aborted")));
> +
> +            let env2 = env.clone();
> +            let req_fut = async move {
> +                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
> +                env2.debug("protocol upgrade done");
> +
> +                let mut http = hyper::server::conn::Http::new();
> +                http.http2_only(true);
> +                // increase window size: todo - find optiomal size
> +                let window_size = 32*1024*1024; // max = (1 << 31) - 2
> +                http.http2_initial_stream_window_size(window_size);
> +                http.http2_initial_connection_window_size(window_size);
> +                http.http2_max_frame_size(4*1024*1024);
> +
> +                http.serve_connection(conn, service)
> +                    .map_err(Error::from).await
> +            };
> +
> +            futures::select!{
> +                req = req_fut.fuse() => req?,
> +                abort = abort_future => abort?,
> +            };
> +
> +            env.log("reader finished successfully");
> +
> +            Ok(())
>          })?;
>  
>          let response = Response::builder()
> -- 
> 2.20.1
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel




^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
  2021-01-29  6:05 ` Dietmar Maurer
@ 2021-01-29  7:01   ` Dominik Csapak
  2021-01-29  8:17     ` Dietmar Maurer
  0 siblings, 1 reply; 4+ messages in thread
From: Dominik Csapak @ 2021-01-29  7:01 UTC (permalink / raw)
  To: Dietmar Maurer, Proxmox Backup Server development discussion

On 1/29/21 7:05 AM, Dietmar Maurer wrote:
> Why did you change the _guard lifetime?

how so?

before this patch the guard was moved to the 'map' closure of
the future::select call
so it did go out of scope after either future (req_fut, abort_future) 
resolved

now i moved it in the async move block of the worker, and goes out of
scope after the future resolves

the difference is the 'env.log(...)' and 'Ok(())' line, but
that should not make any difference for the guard?

> 
> 
>> On 01/26/2021 11:17 AM Dominik Csapak <d.csapak@proxmox.com> wrote:
>>
>>   
>> this way, the code is much more readable
>>
>> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
>> ---
>> changes from v1:
>> * rebased on master
>>
>>   src/api2/reader.rs | 68 +++++++++++++++++++++-------------------------
>>   1 file changed, 31 insertions(+), 37 deletions(-)
>>
>> diff --git a/src/api2/reader.rs b/src/api2/reader.rs
>> index 43d832ce..ae936184 100644
>> --- a/src/api2/reader.rs
>> +++ b/src/api2/reader.rs
>> @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol(
>>   
>>           let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time());
>>   
>> -        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| {
>> +        WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move {
>> +            let _guard = _guard;
>> +
>>               let mut env = ReaderEnvironment::new(
>>                   env_type,
>>                   auth_id,
>> @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol(
>>   
>>               let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug);
>>   
>> -            let abort_future = worker.abort_future();
>> -
>> -            let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body))
>> -                .map_err(Error::from)
>> -                .and_then({
>> -                    let env = env.clone();
>> -                    move |conn| {
>> -                        env.debug("protocol upgrade done");
>> -
>> -                        let mut http = hyper::server::conn::Http::new();
>> -                        http.http2_only(true);
>> -                        // increase window size: todo - find optiomal size
>> -                        let window_size = 32*1024*1024; // max = (1 << 31) - 2
>> -                        http.http2_initial_stream_window_size(window_size);
>> -                        http.http2_initial_connection_window_size(window_size);
>> -                        http.http2_max_frame_size(4*1024*1024);
>> -
>> -                        http.serve_connection(conn, service)
>> -                            .map_err(Error::from)
>> -                    }
>> -                });
>> -            let abort_future = abort_future
>> -                .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) });
>> -
>> -            use futures::future::Either;
>> -            futures::future::select(req_fut, abort_future)
>> -                .map(move |res| {
>> -                    let _guard = _guard;
>> -                    match res {
>> -                        Either::Left((Ok(_), _)) => Ok(()),
>> -                        Either::Left((Err(err), _)) => Err(err),
>> -                        Either::Right((Ok(_), _)) => Ok(()),
>> -                        Either::Right((Err(err), _)) => Err(err),
>> -                    }
>> -                })
>> -                .map_ok(move |_| env.log("reader finished successfully"))
>> +            let mut abort_future = worker.abort_future()
>> +                .map(|_| Err(format_err!("task aborted")));
>> +
>> +            let env2 = env.clone();
>> +            let req_fut = async move {
>> +                let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?;
>> +                env2.debug("protocol upgrade done");
>> +
>> +                let mut http = hyper::server::conn::Http::new();
>> +                http.http2_only(true);
>> +                // increase window size: todo - find optiomal size
>> +                let window_size = 32*1024*1024; // max = (1 << 31) - 2
>> +                http.http2_initial_stream_window_size(window_size);
>> +                http.http2_initial_connection_window_size(window_size);
>> +                http.http2_max_frame_size(4*1024*1024);
>> +
>> +                http.serve_connection(conn, service)
>> +                    .map_err(Error::from).await
>> +            };
>> +
>> +            futures::select!{
>> +                req = req_fut.fuse() => req?,
>> +                abort = abort_future => abort?,
>> +            };
>> +
>> +            env.log("reader finished successfully");
>> +
>> +            Ok(())
>>           })?;
>>   
>>           let response = Response::builder()
>> -- 
>> 2.20.1
>>
>>
>>
>> _______________________________________________
>> pbs-devel mailing list
>> pbs-devel@lists.proxmox.com
>> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel





^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task
  2021-01-29  7:01   ` Dominik Csapak
@ 2021-01-29  8:17     ` Dietmar Maurer
  0 siblings, 0 replies; 4+ messages in thread
From: Dietmar Maurer @ 2021-01-29  8:17 UTC (permalink / raw)
  To: Dominik Csapak, Proxmox Backup Server development discussion


> On 01/29/2021 8:01 AM Dominik Csapak <d.csapak@proxmox.com> wrote:
> 
>  
> On 1/29/21 7:05 AM, Dietmar Maurer wrote:
> > Why did you change the _guard lifetime?
> 
> how so?
> 
> before this patch the guard was moved to the 'map' closure of
> the future::select call
> so it did go out of scope after either future (req_fut, abort_future) 
> resolved
> 
> now i moved it in the async move block of the worker, and goes out of
> scope after the future resolves

OK, I can see it now - will apply the patch.




^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-01-29  8:18 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-26 10:17 [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task Dominik Csapak
2021-01-29  6:05 ` Dietmar Maurer
2021-01-29  7:01   ` Dominik Csapak
2021-01-29  8:17     ` Dietmar Maurer

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal