all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 04/12] tokio 1.0: AsyncRead/Seek with ReadBuf
Date: Tue, 12 Jan 2021 14:58:18 +0100	[thread overview]
Message-ID: <20210112135830.2798301-9-f.gruenbichler@proxmox.com> (raw)
In-Reply-To: <20210112135830.2798301-1-f.gruenbichler@proxmox.com>

Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
---
 src/backup/async_index_reader.rs   | 23 +++++++-------
 src/tools/async_io.rs              | 50 ++----------------------------
 src/tools/wrapped_reader_stream.rs |  8 +++--
 3 files changed, 19 insertions(+), 62 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index f6a72099..2a04282c 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -6,7 +6,7 @@ use std::io::SeekFrom;
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::{AsyncRead, AsyncSeek};
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
@@ -71,8 +71,8 @@ where
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<tokio::io::Result<usize>> {
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
         let this = Pin::get_mut(self);
         loop {
             match &mut this.state {
@@ -86,12 +86,12 @@ where
                     } else {
                         match this.index.chunk_from_offset(this.position) {
                             Some(res) => res,
-                            None => return Poll::Ready(Ok(0))
+                            None => return Poll::Ready(Ok(()))
                         }
                     };
 
                     if idx >= this.index.index_count() {
-                        return Poll::Ready(Ok(0));
+                        return Poll::Ready(Ok(()));
                     }
 
                     let info = this
@@ -142,13 +142,13 @@ where
                 AsyncIndexReaderState::HaveData => {
                     let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
-                    let n = if len - offset < buf.len() {
+                    let n = if len - offset < buf.remaining() {
                         len - offset
                     } else {
-                        buf.len()
+                        buf.remaining()
                     };
 
-                    buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    buf.put_slice(&this.read_buffer[offset..(offset + n)]);
                     this.position += n as u64;
 
                     if offset + n == len {
@@ -158,7 +158,7 @@ where
                         this.state = AsyncIndexReaderState::HaveData;
                     }
 
-                    return Poll::Ready(Ok(n));
+                    return Poll::Ready(Ok(()));
                 }
             }
         }
@@ -172,9 +172,8 @@ where
 {
     fn start_seek(
         self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
         pos: SeekFrom,
-    ) -> Poll<tokio::io::Result<()>> {
+    ) -> tokio::io::Result<()> {
         let this = Pin::get_mut(self);
         this.seek_to_pos = match pos {
             SeekFrom::Start(offset) => {
@@ -187,7 +186,7 @@ where
                 this.position as i64 + offset
             }
         };
-        Poll::Ready(Ok(()))
+        Ok(())
     }
 
     fn poll_complete(
diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs
index 4e4107c0..3a5a6c9a 100644
--- a/src/tools/async_io.rs
+++ b/src/tools/async_io.rs
@@ -1,13 +1,12 @@
 //! Generic AsyncRead/AsyncWrite utilities.
 
 use std::io;
-use std::mem::MaybeUninit;
 use std::os::unix::io::{AsRawFd, RawFd};
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
 use futures::stream::{Stream, TryStream};
-use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
 use tokio::net::TcpListener;
 use hyper::client::connect::Connection;
 
@@ -20,8 +19,8 @@ impl<L: AsyncRead + Unpin, R: AsyncRead + Unpin> AsyncRead for EitherStream<L, R
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context,
-        buf: &mut [u8],
-    ) -> Poll<Result<usize, io::Error>> {
+        buf: &mut ReadBuf,
+    ) -> Poll<Result<(), io::Error>> {
         match self.get_mut() {
             EitherStream::Left(ref mut s) => {
                 Pin::new(s).poll_read(cx, buf)
@@ -31,31 +30,6 @@ impl<L: AsyncRead + Unpin, R: AsyncRead + Unpin> AsyncRead for EitherStream<L, R
             }
         }
     }
-
-    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
-        match *self {
-            EitherStream::Left(ref s) => s.prepare_uninitialized_buffer(buf),
-            EitherStream::Right(ref s) => s.prepare_uninitialized_buffer(buf),
-        }
-    }
-
-    fn poll_read_buf<B>(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut B,
-    ) -> Poll<Result<usize, io::Error>>
-    where
-        B: bytes::BufMut,
-    {
-        match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
-                Pin::new(s).poll_read_buf(cx, buf)
-            }
-            EitherStream::Right(ref mut s) => {
-                Pin::new(s).poll_read_buf(cx, buf)
-            }
-        }
-    }
 }
 
 impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L, R> {
@@ -95,24 +69,6 @@ impl<L: AsyncWrite + Unpin, R: AsyncWrite + Unpin> AsyncWrite for EitherStream<L
             }
         }
     }
-
-    fn poll_write_buf<B>(
-        self: Pin<&mut Self>,
-        cx: &mut Context,
-        buf: &mut B,
-    ) -> Poll<Result<usize, io::Error>>
-    where
-        B: bytes::Buf,
-    {
-        match self.get_mut() {
-            EitherStream::Left(ref mut s) => {
-                Pin::new(s).poll_write_buf(cx, buf)
-            }
-            EitherStream::Right(ref mut s) => {
-                Pin::new(s).poll_write_buf(cx, buf)
-            }
-        }
-    }
 }
 
 // we need this for crate::client::http_client:
diff --git a/src/tools/wrapped_reader_stream.rs b/src/tools/wrapped_reader_stream.rs
index 0294cc21..4b01b072 100644
--- a/src/tools/wrapped_reader_stream.rs
+++ b/src/tools/wrapped_reader_stream.rs
@@ -3,7 +3,7 @@ use std::pin::Pin;
 use std::task::{Context, Poll};
 use std::sync::mpsc::Receiver;
 
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, ReadBuf};
 use futures::ready;
 use futures::stream::Stream;
 
@@ -69,8 +69,10 @@ impl<R: AsyncRead + Unpin> Stream for AsyncReaderStream<R> {
 
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
         let this = self.get_mut();
-        match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut this.buffer)) {
-            Ok(n) => {
+        let mut read_buf = ReadBuf::new(&mut this.buffer);
+        match ready!(Pin::new(&mut this.reader).poll_read(cx, &mut read_buf)) {
+            Ok(()) => {
+                let n = read_buf.filled().len();
                 if n == 0 {
                     // EOF
                     Poll::Ready(None)
-- 
2.20.1





  parent reply	other threads:[~2021-01-12 13:59 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-12 13:58 [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 1/4] Cargo.toml: update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 2/4] update to rustyline 7 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 3/4] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox 4/4] tokio 1.0: drop TimeoutFutureExt Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 01/12] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 02/12] tokio 1.0: delay -> sleep Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 03/12] proxmox XXX: use tokio::time::timeout directly Fabian Grünbichler
2021-01-12 13:58 ` Fabian Grünbichler [this message]
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 05/12] tokio: adapt to 1.0 runtime changes Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 06/12] tokio: adapt to 1.0 process:Child changes Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 07/12] tokio 1.0: use ReceiverStream from tokio-stream Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 08/12] tokio 1.0: update to new tokio-openssl interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 09/12] tokio 1.0: update to new Signal interface Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 10/12] hyper: use new hyper::upgrade Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 11/12] examples: unify h2 examples Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-backup 12/12] cleanup: remove unnecessary 'mut' and '.clone()' Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH proxmox-fuse] update to tokio 1.0 Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [PATCH pxar 1/3] " Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 2/3] clippy: use matches! instead of match Fabian Grünbichler
2021-01-12 13:58 ` [pbs-devel] [RFC pxar 3/3] remove futures-io feature Fabian Grünbichler
2021-01-12 14:42   ` Wolfgang Bumiller
2021-01-12 14:52 ` [pbs-devel] [PATCH-SERIES 0/20] update to tokio 1.0 and friends Wolfgang Bumiller
2021-01-14 13:39   ` [pbs-devel] [PATCH proxmox 1/3] fix u2f example Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox-backup] proxmox XXX: adapt to moved ParameterSchema Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox 2/3] move ParameterSchema from router to schema Fabian Grünbichler
2021-01-14 13:39     ` [pbs-devel] [PATCH proxmox 3/3] build: add autopkgtest target Fabian Grünbichler
2021-01-14 13:41   ` [pbs-devel] [PATCH pxar 1/2] fix example Fabian Grünbichler
2021-01-14 13:41     ` [pbs-devel] [PATCH pxar 2/2] build: fix --no-default-features Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210112135830.2798301-9-f.gruenbichler@proxmox.com \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal