From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id CAC1D6778F for ; Tue, 12 Jan 2021 14:59:12 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C1FA726F60 for ; Tue, 12 Jan 2021 14:58:42 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 02D4326F35 for ; Tue, 12 Jan 2021 14:58:42 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id BDD0345795 for ; Tue, 12 Jan 2021 14:58:41 +0100 (CET) From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= To: pbs-devel@lists.proxmox.com Date: Tue, 12 Jan 2021 14:58:21 +0100 Message-Id: <20210112135830.2798301-12-f.gruenbichler@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210112135830.2798301-1-f.gruenbichler@proxmox.com> References: <20210112135830.2798301-1-f.gruenbichler@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox-backup-client.rs, proxmox-backup-proxy.rs, datastore.rs] Subject: [pbs-devel] [PATCH proxmox-backup 07/12] tokio 1.0: use ReceiverStream from tokio-stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 12 Jan 2021 13:59:12 -0000 to wrap a Receiver in a Stream. this will likely move back into tokio proper once we have a std Stream.. Signed-off-by: Fabian Grünbichler --- src/api2/admin/datastore.rs | 3 ++- src/bin/proxmox-backup-client.rs | 3 ++- src/bin/proxmox-backup-proxy.rs | 3 ++- src/client/backup_writer.rs | 5 +++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 32352e5c..5b9a1e84 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -10,6 +10,7 @@ use futures::*; use hyper::http::request::Parts; use hyper::{header, Body, Response, StatusCode}; use serde_json::{json, Value}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::api::{ api, ApiResponseFuture, ApiHandler, ApiMethod, Router, @@ -1562,7 +1563,7 @@ fn pxar_file_download( .map_err(|err| eprintln!("error during finishing of zip: {}", err)) }); - Body::wrap_stream(receiver.map_err(move |err| { + Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| { eprintln!("error during streaming of zip '{:?}' - {}", filepath, err); err })) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index b8f09a4a..d91f04cc 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -12,6 +12,7 @@ use futures::future::FutureExt; use futures::stream::{StreamExt, TryStreamExt}; use serde_json::{json, Value}; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use xdg::BaseDirectories; use pathpatterns::{MatchEntry, MatchType, PatternFlag}; @@ -306,7 +307,7 @@ async fn backup_directory>( let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks - let stream = rx + let stream = ReceiverStream::new(rx) .map_err(Error::from); // spawn chunker inside a separate task so that it can run parallel diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 2228253d..16450244 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -6,6 +6,7 @@ use anyhow::{bail, format_err, Error}; use futures::*; use hyper; use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::try_block; use proxmox::api::RpcEnvironmentType; @@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> { |listener, ready| { let connections = accept_connections(listener, acceptor, debug); - let connections = hyper::server::accept::from_stream(connections); + let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections)); Ok(ready .and_then(|_| hyper::Server::builder(connections) diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index 39cd574d..bcbd6f28 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -10,6 +10,7 @@ use futures::future::AbortHandle; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::ReceiverStream; use proxmox::tools::digest_to_hex; @@ -321,7 +322,7 @@ impl BackupWriter { // }); // old code for reference? tokio::spawn( - verify_queue_rx + ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) .try_for_each(move |response: h2::client::ResponseFuture| { response @@ -349,7 +350,7 @@ impl BackupWriter { // FIXME: async-block-ify this code! tokio::spawn( - verify_queue_rx + ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { -- 2.20.1