From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 07/12] tokio 1.0: use ReceiverStream from tokio-stream
Date: Tue, 12 Jan 2021 14:58:21 +0100 [thread overview]
Message-ID: <20210112135830.2798301-12-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20210112135830.2798301-1-f.gruenbichler@proxmox.com>
to wrap a Receiver in a Stream. this will likely move back into tokio
proper once we have a std Stream..
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
src/api2/admin/datastore.rs | 3 ++-
src/bin/proxmox-backup-client.rs | 3 ++-
src/bin/proxmox-backup-proxy.rs | 3 ++-
src/client/backup_writer.rs | 5 +++--
4 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 32352e5c..5b9a1e84 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -10,6 +10,7 @@ use futures::*;
use hyper::http::request::Parts;
use hyper::{header, Body, Response, StatusCode};
use serde_json::{json, Value};
+use tokio_stream::wrappers::ReceiverStream;
use proxmox::api::{
api, ApiResponseFuture, ApiHandler, ApiMethod, Router,
@@ -1562,7 +1563,7 @@ fn pxar_file_download(
.map_err(|err| eprintln!("error during finishing of zip: {}", err))
});
- Body::wrap_stream(receiver.map_err(move |err| {
+ Body::wrap_stream(ReceiverStream::new(receiver).map_err(move |err| {
eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
err
}))
diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs
index b8f09a4a..d91f04cc 100644
--- a/src/bin/proxmox-backup-client.rs
+++ b/src/bin/proxmox-backup-client.rs
@@ -12,6 +12,7 @@ use futures::future::FutureExt;
use futures::stream::{StreamExt, TryStreamExt};
use serde_json::{json, Value};
use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
use xdg::BaseDirectories;
use pathpatterns::{MatchEntry, MatchType, PatternFlag};
@@ -306,7 +307,7 @@ async fn backup_directory<P: AsRef<Path>>(
let (mut tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
- let stream = rx
+ let stream = ReceiverStream::new(rx)
.map_err(Error::from);
// spawn chunker inside a separate task so that it can run parallel
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 2228253d..16450244 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -6,6 +6,7 @@ use anyhow::{bail, format_err, Error};
use futures::*;
use hyper;
use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype};
+use tokio_stream::wrappers::ReceiverStream;
use proxmox::try_block;
use proxmox::api::RpcEnvironmentType;
@@ -122,7 +123,7 @@ async fn run() -> Result<(), Error> {
|listener, ready| {
let connections = accept_connections(listener, acceptor, debug);
- let connections = hyper::server::accept::from_stream(connections);
+ let connections = hyper::server::accept::from_stream(ReceiverStream::new(connections));
Ok(ready
.and_then(|_| hyper::Server::builder(connections)
diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs
index 39cd574d..bcbd6f28 100644
--- a/src/client/backup_writer.rs
+++ b/src/client/backup_writer.rs
@@ -10,6 +10,7 @@ use futures::future::AbortHandle;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
+use tokio_stream::wrappers::ReceiverStream;
use proxmox::tools::digest_to_hex;
@@ -321,7 +322,7 @@ impl BackupWriter {
// });
// old code for reference?
tokio::spawn(
- verify_queue_rx
+ ReceiverStream::new(verify_queue_rx)
.map(Ok::<_, Error>)
.try_for_each(move |response: h2::client::ResponseFuture| {
response
@@ -349,7 +350,7 @@ impl BackupWriter {
// FIXME: async-block-ify this code!
tokio::spawn(
- verify_queue_rx
+ ReceiverStream::new(verify_queue_rx)
.map(Ok::<_, Error>)
.and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option<h2::client::ResponseFuture>)| {
match (response, merged_chunk_info) {
--
2.20.1
next prev parent reply other threads:[~2021-01-12 13:59 UTC|newest]
Thread overview: 29+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-12 13:58 [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 1/4] Cargo.toml: update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 2/4] update to rustyline 7 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 3/4] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 4/4] tokio 1.0: drop TimeoutFutureExt Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 01/12] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 02/12] tokio 1.0: delay -> sleep Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 03/12] proxmox XXX: use tokio::time::timeout directly Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 04/12] tokio 1.0: AsyncRead/Seek with ReadBuf Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 05/12] tokio: adapt to 1.0 runtime changes Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 06/12] tokio: adapt to 1.0 process:Child changes Fabian Grünbichler
2021-01-12 13:58 ` Fabian Grünbichler [this message]
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 08/12] tokio 1.0: update to new tokio-openssl interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 09/12] tokio 1.0: update to new Signal interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 10/12] hyper: use new hyper::upgrade Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 11/12] examples: unify h2 examples Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 12/12] cleanup: remove unnecessary 'mut' and '.clone()' Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-fuse] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH pxar 1/3] " Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 2/3] clippy: use matches! instead of match Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 3/3] remove futures-io feature Fabian Grünbichler
2021-01-12 14:42 ` Wolfgang Bumiller
2021-01-12 14:52 ` [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Wolfgang Bumiller
2021-01-14 13:39 ` [pbs-devel] [PATCH proxmox 1/3] fix u2f example Fabian Grünbichler
2021-01-14 13:39 ` [pbs-devel] [PATCH proxmox-backup] proxmox XXX: adapt to moved ParameterSchema Fabian Grünbichler
2021-01-14 13:39 ` [pbs-devel] [PATCH proxmox 2/3] move ParameterSchema from router to schema Fabian Grünbichler
2021-01-14 13:39 ` [pbs-devel] [PATCH proxmox 3/3] build: add autopkgtest target Fabian Grünbichler
2021-01-14 13:41 ` [pbs-devel] [PATCH pxar 1/2] fix example Fabian Grünbichler
2021-01-14 13:41 ` [pbs-devel] [PATCH pxar 2/2] build: fix --no-default-features Fabian Grünbichler
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210112135830.2798301-12-f.gruenbichler@proxmox.com \
--to=f.gruenbichler@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox