all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Date: Mon,  7 Jun 2021 17:35:27 +0200	[thread overview]
Message-ID: <20210607153532.2522267-5-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com>

Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.

This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

v2:
* drop 'seek_to_pos' from struct and implement error handling directly in
  start_seek
* simplify future handling in poll_read with Option::get_or_insert_with

 src/backup/cached_chunk_reader.rs | 104 +++++++++++++++++++++++++++++-
 1 file changed, 102 insertions(+), 2 deletions(-)

diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index ff476e37..c9ca4773 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
 //! An async and concurrency safe data reader backed by a local LRU cache.
 
 use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
 use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
 
 struct AsyncChunkCacher<T> {
     reader: Arc<T>,
@@ -87,3 +94,96 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
         Ok(read)
     }
 }
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+    CachedChunkReader<I, R>
+{
+    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+    /// otherwise.
+    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+        SeekableCachedChunkReader {
+            index_bytes: self.index.index_bytes(),
+            reader: Arc::new(self),
+            position: 0,
+            read_future: None,
+        }
+    }
+}
+
+pub struct SeekableCachedChunkReader<
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+> {
+    reader: Arc<CachedChunkReader<I, R>>,
+    index_bytes: u64,
+    position: u64,
+    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+        let this = Pin::get_mut(self);
+        let seek_to_pos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+            SeekFrom::Current(offset) => this.position as i64 + offset,
+        };
+        if seek_to_pos < 0 {
+            return Err(io_format_err!("cannot seek to negative values"));
+        } else if seek_to_pos > this.index_bytes as i64 {
+            this.position = this.index_bytes;
+        } else {
+            this.position = seek_to_pos as u64;
+        }
+        Ok(())
+    }
+
+    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+        Poll::Ready(Ok(self.position))
+    }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+
+        let offset = this.position;
+        let wanted = buf.capacity();
+        let reader = Arc::clone(&this.reader);
+
+        let fut = this.read_future.get_or_insert_with(|| {
+            Box::pin(async move {
+                let mut read_buf = vec![0u8; wanted];
+                let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+                Ok((read_buf, read))
+            })
+        });
+
+        let ret = match ready!(fut.as_mut().poll(cx)) {
+            Ok((read_buf, read)) => {
+                buf.put_slice(&read_buf[..read]);
+                this.position += read as u64;
+                Ok(())
+            }
+            Err(err) => Err(io_err_other(err)),
+        };
+
+        // future completed, drop
+        this.read_future = None;
+
+        Poll::Ready(ret)
+    }
+}
-- 
2.30.2





WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Date: Mon,  7 Jun 2021 17:35:27 +0200	[thread overview]
Message-ID: <20210607153532.2522267-5-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com>

Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.

This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

v2:
* drop 'seek_to_pos' from struct and implement error handling directly in
  start_seek
* simplify future handling in poll_read with Option::get_or_insert_with

 src/backup/cached_chunk_reader.rs | 104 +++++++++++++++++++++++++++++-
 1 file changed, 102 insertions(+), 2 deletions(-)

diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index ff476e37..c9ca4773 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
 //! An async and concurrency safe data reader backed by a local LRU cache.
 
 use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
 
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
 use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
 
 struct AsyncChunkCacher<T> {
     reader: Arc<T>,
@@ -87,3 +94,96 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
         Ok(read)
     }
 }
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+    CachedChunkReader<I, R>
+{
+    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+    /// otherwise.
+    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+        SeekableCachedChunkReader {
+            index_bytes: self.index.index_bytes(),
+            reader: Arc::new(self),
+            position: 0,
+            read_future: None,
+        }
+    }
+}
+
+pub struct SeekableCachedChunkReader<
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+> {
+    reader: Arc<CachedChunkReader<I, R>>,
+    index_bytes: u64,
+    position: u64,
+    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+        let this = Pin::get_mut(self);
+        let seek_to_pos = match pos {
+            SeekFrom::Start(offset) => offset as i64,
+            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+            SeekFrom::Current(offset) => this.position as i64 + offset,
+        };
+        if seek_to_pos < 0 {
+            return Err(io_format_err!("cannot seek to negative values"));
+        } else if seek_to_pos > this.index_bytes as i64 {
+            this.position = this.index_bytes;
+        } else {
+            this.position = seek_to_pos as u64;
+        }
+        Ok(())
+    }
+
+    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+        Poll::Ready(Ok(self.position))
+    }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+    I: IndexFile + Send + Sync + 'static,
+    R: AsyncReadChunk + Send + Sync + 'static,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut ReadBuf,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+
+        let offset = this.position;
+        let wanted = buf.capacity();
+        let reader = Arc::clone(&this.reader);
+
+        let fut = this.read_future.get_or_insert_with(|| {
+            Box::pin(async move {
+                let mut read_buf = vec![0u8; wanted];
+                let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+                Ok((read_buf, read))
+            })
+        });
+
+        let ret = match ready!(fut.as_mut().poll(cx)) {
+            Ok((read_buf, read)) => {
+                buf.put_slice(&read_buf[..read]);
+                this.position += read as u64;
+                Ok(())
+            }
+            Err(err) => Err(io_err_other(err)),
+        };
+
+        // future completed, drop
+        this.read_future = None;
+
+        Poll::Ready(ret)
+    }
+}
-- 
2.30.2





  parent reply	other threads:[~2021-06-07 15:35 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-07 15:35 ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` Stefan Reiter [this message]
2021-06-07 15:35   ` [pbs-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-08  7:58 ` [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Wolfgang Bumiller
2021-06-08  7:58   ` [pbs-devel] " Wolfgang Bumiller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210607153532.2522267-5-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal