From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id CD04C6B3B5 for ; Tue, 26 Jan 2021 11:17:34 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C01F514D2D for ; Tue, 26 Jan 2021 11:17:04 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id DA80C14D24 for ; Tue, 26 Jan 2021 11:17:03 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 9CBA14603E for ; Tue, 26 Jan 2021 11:17:03 +0100 (CET) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 26 Jan 2021 11:17:02 +0100 Message-Id: <20210126101702.28778-1-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.261 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [reader.rs] Subject: [pbs-devel] [PATCH proxmox-backup v2] api2/reader: asyncify the reader worker task X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 26 Jan 2021 10:17:34 -0000 this way, the code is much more readable Signed-off-by: Dominik Csapak --- changes from v1: * rebased on master src/api2/reader.rs | 68 +++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 37 deletions(-) diff --git a/src/api2/reader.rs b/src/api2/reader.rs index 43d832ce..ae936184 100644 --- a/src/api2/reader.rs +++ b/src/api2/reader.rs @@ -115,7 +115,9 @@ fn upgrade_to_backup_reader_protocol( let worker_id = format!("{}:{}/{}/{:08X}", store, backup_type, backup_id, backup_dir.backup_time()); - WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| { + WorkerTask::spawn("reader", Some(worker_id), auth_id.clone(), true, move |worker| async move { + let _guard = _guard; + let mut env = ReaderEnvironment::new( env_type, auth_id, @@ -130,42 +132,34 @@ fn upgrade_to_backup_reader_protocol( let service = H2Service::new(env.clone(), worker.clone(), &READER_API_ROUTER, debug); - let abort_future = worker.abort_future(); - - let req_fut = hyper::upgrade::on(Request::from_parts(parts, req_body)) - .map_err(Error::from) - .and_then({ - let env = env.clone(); - move |conn| { - env.debug("protocol upgrade done"); - - let mut http = hyper::server::conn::Http::new(); - http.http2_only(true); - // increase window size: todo - find optiomal size - let window_size = 32*1024*1024; // max = (1 << 31) - 2 - http.http2_initial_stream_window_size(window_size); - http.http2_initial_connection_window_size(window_size); - http.http2_max_frame_size(4*1024*1024); - - http.serve_connection(conn, service) - .map_err(Error::from) - } - }); - let abort_future = abort_future - .map(|_| -> Result<(), anyhow::Error> { Err(format_err!("task aborted")) }); - - use futures::future::Either; - futures::future::select(req_fut, abort_future) - .map(move |res| { - let _guard = _guard; - match res { - Either::Left((Ok(_), _)) => Ok(()), - Either::Left((Err(err), _)) => Err(err), - Either::Right((Ok(_), _)) => Ok(()), - Either::Right((Err(err), _)) => Err(err), - } - }) - .map_ok(move |_| env.log("reader finished successfully")) + let mut abort_future = worker.abort_future() + .map(|_| Err(format_err!("task aborted"))); + + let env2 = env.clone(); + let req_fut = async move { + let conn = hyper::upgrade::on(Request::from_parts(parts, req_body)).await?; + env2.debug("protocol upgrade done"); + + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let window_size = 32*1024*1024; // max = (1 << 31) - 2 + http.http2_initial_stream_window_size(window_size); + http.http2_initial_connection_window_size(window_size); + http.http2_max_frame_size(4*1024*1024); + + http.serve_connection(conn, service) + .map_err(Error::from).await + }; + + futures::select!{ + req = req_fut.fuse() => req?, + abort = abort_future => abort?, + }; + + env.log("reader finished successfully"); + + Ok(()) })?; let response = Response::builder() -- 2.20.1