public inbox for pve-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Wolfgang Bumiller <w.bumiller@proxmox.com>
To: Stefan Reiter <s.reiter@proxmox.com>
Cc: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: Re: [pve-devel] [pbs-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
Date: Fri, 4 Jun 2021 14:30:28 +0200	[thread overview]
Message-ID: <20210604123028.qa5i6keqcnocviqe@olga.proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-5-s.reiter@proxmox.com>

On Wed, Jun 02, 2021 at 04:38:28PM +0200, Stefan Reiter wrote:
> Implemented as a seperate struct SeekableCachedChunkReader that contains
> the original as an Arc, since the read_at future captures the
> CachedChunkReader, which would otherwise not work with the lifetimes
> required by AsyncRead. This is also the reason we cannot use a shared
> read buffer and have to allocate a new one for every read. It also means
> that the struct items required for AsyncRead/Seek do not need to be
> included in a regular CachedChunkReader.
> 
> This is intended as a replacement for AsyncIndexReader, so we have less
> code duplication and can utilize the LRU cache there too (even though
> actual request concurrency is not supported in these traits).
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++-
>  1 file changed, 114 insertions(+), 2 deletions(-)
> 
> diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
> index fd5a049f..9b56fd14 100644
> --- a/src/backup/cached_chunk_reader.rs
> +++ b/src/backup/cached_chunk_reader.rs
> @@ -1,12 +1,19 @@
>  //! An async and concurrency safe data reader backed by a local LRU cache.
>  
>  use anyhow::Error;
> +use futures::future::Future;
> +use futures::ready;
> +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
>  
> -use std::future::Future;
> +use std::io::SeekFrom;
> +use std::pin::Pin;
>  use std::sync::Arc;
> +use std::task::{Context, Poll};
>  
> -use crate::backup::{AsyncReadChunk, IndexFile};
> +use super::{AsyncReadChunk, IndexFile};
>  use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
> +use proxmox::io_format_err;
> +use proxmox::sys::error::io_err_other;
>  
>  struct AsyncChunkCacher<T> {
>      reader: Arc<T>,
> @@ -85,3 +92,108 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
>          Ok(read)
>      }
>  }
> +
> +impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
> +    CachedChunkReader<I, R>
> +{
> +    /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
> +    /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
> +    /// otherwise.
> +    pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
> +        SeekableCachedChunkReader {
> +            index_bytes: self.index.index_bytes(),
> +            reader: Arc::new(self),
> +            position: 0,
> +            seek_to_pos: 0,
> +            read_future: None,
> +        }
> +    }
> +}
> +
> +pub struct SeekableCachedChunkReader<
> +    I: IndexFile + Send + Sync + 'static,
> +    R: AsyncReadChunk + Send + Sync + 'static,
> +> {
> +    reader: Arc<CachedChunkReader<I, R>>,
> +    index_bytes: u64,
> +    position: u64,
> +    seek_to_pos: i64,
> +    read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
> +}
> +
> +impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
> +where
> +    I: IndexFile + Send + Sync + 'static,
> +    R: AsyncReadChunk + Send + Sync + 'static,
> +{
> +    fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
> +        let this = Pin::get_mut(self);
> +        this.seek_to_pos = match pos {
> +            SeekFrom::Start(offset) => offset as i64,
> +            SeekFrom::End(offset) => this.index_bytes as i64 + offset,
> +            SeekFrom::Current(offset) => this.position as i64 + offset,
> +        };
> +        Ok(())
> +    }
> +
> +    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
> +        let this = Pin::get_mut(self);
> +
> +        let index_bytes = this.index_bytes;
> +        if this.seek_to_pos < 0 {
> +            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));

IMO we should ditch `seek_to_pos` altogether, move the error handling
into `start_seek` and just always return
`Poll::Ready(Ok(this.position))` straightaway.
Unless there's a reason to split this up? Other resources don't
guarantee much of anything if you start read/writing *between*
`start_seek`/`poll_complete` after all.

> +        } else if this.seek_to_pos > index_bytes as i64 {
> +            this.position = index_bytes;
> +        } else {
> +            this.position = this.seek_to_pos as u64;
> +        }
> +
> +        Poll::Ready(Ok(this.position))
> +    }
> +}
> +
> +impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
> +where
> +    I: IndexFile + Send + Sync + 'static,
> +    R: AsyncReadChunk + Send + Sync + 'static,
> +{
> +    fn poll_read(
> +        self: Pin<&mut Self>,
> +        cx: &mut Context,
> +        buf: &mut ReadBuf,
> +    ) -> Poll<tokio::io::Result<()>> {
> +        let this = Pin::get_mut(self);
> +
> +        let fut = match this.read_future {
> +            Some(ref mut fut) => fut,
> +            None => {
> +                let offset = this.position;
> +                let wanted = buf.capacity();
> +                let reader = Arc::clone(&this.reader);
> +                let fut = Box::pin(async move {
> +                    let mut read_buf = vec![0u8; wanted];
> +                    let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
> +                    Ok((read_buf, read))
> +                });
> +                this.read_future = Some(fut);
> +                this.read_future.as_mut().unwrap()
> +            }
> +        };

Your `None` case seems trivial enough that you could use the Option's
`.get_or_insert_with()` instead of match with `ref mut` and `.as_mut().unwrap()`
(since the `None` case has no error cases)

> +
> +        let ret = match ready!(fut.as_mut().poll(cx)) {
> +            Ok((read_buf, read)) => {
> +                buf.put_slice(&read_buf[..read]);
> +                this.position += read as u64;
> +                Ok(())
> +            }
> +            Err(err) => {
> +                Err(io_err_other(err))
> +            }
> +        };
> +
> +        // future completed, drop
> +        let _drop = this.read_future.take();

Why not just `this.read_future = None;` ?

> +
> +        Poll::Ready(ret)
> +    }
> +}
> -- 
> 2.30.2




  reply	other threads:[~2021-06-04 12:31 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-02 14:38 [pve-devel] [PATCH 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-04 12:22   ` [pve-devel] [pbs-devel] " Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-04 12:30   ` Wolfgang Bumiller [this message]
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-04 12:16   ` [pve-devel] [pbs-devel] " Wolfgang Bumiller
2021-06-07  8:03     ` Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210604123028.qa5i6keqcnocviqe@olga.proxmox.com \
    --to=w.bumiller@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    --cc=s.reiter@proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal