all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Date: Wed,  2 Jun 2021 16:38:28 +0200	[thread overview]
Message-ID: <20210602143833.4423-5-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>

Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.

This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++-
 1 file changed, 114 insertions(+), 2 deletions(-)

diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index fd5a049f..9b56fd14 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
 //! An async and concurrency safe data reader backed by a local LRU cache.
 
 use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
 use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
 
 struct AsyncChunkCacher<T> {
     reader: Arc<T>,
@@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
         Ok(read)
     }
 }
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+    CachedChunkReader<I, R>
+{
+    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+    /// otherwise.
+    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+        SeekableCachedChunkReader {
+            index_bytes: self.index.index_bytes(),
+            reader: Arc::new(self),
+            position: 0,
+            seek_to_pos: 0,
+            read_future: None,
+        }
+    }
+}
+
+pub struct SeekableCachedChunkReader<
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+> {
+    reader: Arc<CachedChunkReader<I, R>>,
+    index_bytes: u64,
+    position: u64,
+    seek_to_pos: i64,
+    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+            SeekFrom::Current(offset) => this.position as i64 + offset,
+        };
+        Ok(())
+    }
+
+    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index_bytes;
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        Poll::Ready(Ok(this.position))
+    }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+
+        let fut = match this.read_future {
+            Some(ref mut fut) => fut,
+            None => {
+                let offset = this.position;
+                let wanted = buf.capacity();
+                let reader = Arc::clone(&this.reader);
+                let fut = Box::pin(async move {
+                    let mut read_buf = vec![0u8; wanted];
+                    let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+                    Ok((read_buf, read))
+                });
+                this.read_future = Some(fut);
+                this.read_future.as_mut().unwrap()
+            }
+        };
+
+        let ret = match ready!(fut.as_mut().poll(cx)) {
+            Ok((read_buf, read)) => {
+                buf.put_slice(&read_buf[..read]);
+                this.position += read as u64;
+                Ok(())
+            }
+            Err(err) => {
+                Err(io_err_other(err))
+            }
+        };
+
+        // future completed, drop
+        let _drop = this.read_future.take();
+
+        Poll::Ready(ret)
+    }
+}
-- 
2.30.2





WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Date: Wed,  2 Jun 2021 16:38:28 +0200	[thread overview]
Message-ID: <20210602143833.4423-5-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>

Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.

This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++-
 1 file changed, 114 insertions(+), 2 deletions(-)

diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index fd5a049f..9b56fd14 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
 //! An async and concurrency safe data reader backed by a local LRU cache.
 
 use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
 use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
 
 struct AsyncChunkCacher<T> {
     reader: Arc<T>,
@@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
         Ok(read)
     }
 }
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+    CachedChunkReader<I, R>
+{
+    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+    /// otherwise.
+    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+        SeekableCachedChunkReader {
+            index_bytes: self.index.index_bytes(),
+            reader: Arc::new(self),
+            position: 0,
+            seek_to_pos: 0,
+            read_future: None,
+        }
+    }
+}
+
+pub struct SeekableCachedChunkReader<
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+> {
+    reader: Arc<CachedChunkReader<I, R>>,
+    index_bytes: u64,
+    position: u64,
+    seek_to_pos: i64,
+    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+            SeekFrom::Current(offset) => this.position as i64 + offset,
+        };
+        Ok(())
+    }
+
+    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index_bytes;
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        Poll::Ready(Ok(this.position))
+    }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+
+        let fut = match this.read_future {
+            Some(ref mut fut) => fut,
+            None => {
+                let offset = this.position;
+                let wanted = buf.capacity();
+                let reader = Arc::clone(&this.reader);
+                let fut = Box::pin(async move {
+                    let mut read_buf = vec![0u8; wanted];
+                    let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+                    Ok((read_buf, read))
+                });
+                this.read_future = Some(fut);
+                this.read_future.as_mut().unwrap()
+            }
+        };
+
+        let ret = match ready!(fut.as_mut().poll(cx)) {
+            Ok((read_buf, read)) => {
+                buf.put_slice(&read_buf[..read]);
+                this.position += read as u64;
+                Ok(())
+            }
+            Err(err) => {
+                Err(io_err_other(err))
+            }
+        };
+
+        // future completed, drop
+        let _drop = this.read_future.take();
+
+        Poll::Ready(ret)
+    }
+}
-- 
2.30.2





  parent reply	other threads:[~2021-06-02 14:39 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-02 14:38 [pve-devel] [PATCH 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-04 12:22   ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:22     ` Wolfgang Bumiller
2021-06-02 14:38 ` Stefan Reiter [this message]
2021-06-02 14:38   ` [pbs-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-04 12:30   ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:30     ` Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter
2021-06-04 12:16   ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:16     ` Wolfgang Bumiller
2021-06-07  8:03     ` [pve-devel] " Stefan Reiter
2021-06-07  8:03       ` Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-02 14:38   ` [pbs-devel] " Stefan Reiter

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210602143833.4423-5-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal