From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id D9DD1750A7; Fri, 4 Jun 2021 14:31:03 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B2CCF1CAFA; Fri, 4 Jun 2021 14:30:33 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 407021CAED; Fri, 4 Jun 2021 14:30:30 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 1214846706; Fri, 4 Jun 2021 14:30:30 +0200 (CEST) Date: Fri, 4 Jun 2021 14:30:28 +0200 From: Wolfgang Bumiller To: Stefan Reiter Cc: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com Message-ID: <20210604123028.qa5i6keqcnocviqe@olga.proxmox.com> References: <20210602143833.4423-1-s.reiter@proxmox.com> <20210602143833.4423-5-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20210602143833.4423-5-s.reiter@proxmox.com> User-Agent: NeoMutt/20180716 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 04 Jun 2021 12:31:03 -0000 On Wed, Jun 02, 2021 at 04:38:28PM +0200, Stefan Reiter wrote: > Implemented as a seperate struct SeekableCachedChunkReader that contains > the original as an Arc, since the read_at future captures the > CachedChunkReader, which would otherwise not work with the lifetimes > required by AsyncRead. This is also the reason we cannot use a shared > read buffer and have to allocate a new one for every read. It also means > that the struct items required for AsyncRead/Seek do not need to be > included in a regular CachedChunkReader. > > This is intended as a replacement for AsyncIndexReader, so we have less > code duplication and can utilize the LRU cache there too (even though > actual request concurrency is not supported in these traits). > > Signed-off-by: Stefan Reiter > --- > src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++- > 1 file changed, 114 insertions(+), 2 deletions(-) > > diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs > index fd5a049f..9b56fd14 100644 > --- a/src/backup/cached_chunk_reader.rs > +++ b/src/backup/cached_chunk_reader.rs > @@ -1,12 +1,19 @@ > //! An async and concurrency safe data reader backed by a local LRU cache. > > use anyhow::Error; > +use futures::future::Future; > +use futures::ready; > +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; > > -use std::future::Future; > +use std::io::SeekFrom; > +use std::pin::Pin; > use std::sync::Arc; > +use std::task::{Context, Poll}; > > -use crate::backup::{AsyncReadChunk, IndexFile}; > +use super::{AsyncReadChunk, IndexFile}; > use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; > +use proxmox::io_format_err; > +use proxmox::sys::error::io_err_other; > > struct AsyncChunkCacher { > reader: Arc, > @@ -85,3 +92,108 @@ impl CachedChunkReader< > Ok(read) > } > } > + > +impl > + CachedChunkReader > +{ > + /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and > + /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred > + /// otherwise. > + pub fn seekable(self) -> SeekableCachedChunkReader { > + SeekableCachedChunkReader { > + index_bytes: self.index.index_bytes(), > + reader: Arc::new(self), > + position: 0, > + seek_to_pos: 0, > + read_future: None, > + } > + } > +} > + > +pub struct SeekableCachedChunkReader< > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +> { > + reader: Arc>, > + index_bytes: u64, > + position: u64, > + seek_to_pos: i64, > + read_future: Option, usize), Error>> + Send>>>, > +} > + > +impl AsyncSeek for SeekableCachedChunkReader > +where > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +{ > + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { > + let this = Pin::get_mut(self); > + this.seek_to_pos = match pos { > + SeekFrom::Start(offset) => offset as i64, > + SeekFrom::End(offset) => this.index_bytes as i64 + offset, > + SeekFrom::Current(offset) => this.position as i64 + offset, > + }; > + Ok(()) > + } > + > + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { > + let this = Pin::get_mut(self); > + > + let index_bytes = this.index_bytes; > + if this.seek_to_pos < 0 { > + return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); IMO we should ditch `seek_to_pos` altogether, move the error handling into `start_seek` and just always return `Poll::Ready(Ok(this.position))` straightaway. Unless there's a reason to split this up? Other resources don't guarantee much of anything if you start read/writing *between* `start_seek`/`poll_complete` after all. > + } else if this.seek_to_pos > index_bytes as i64 { > + this.position = index_bytes; > + } else { > + this.position = this.seek_to_pos as u64; > + } > + > + Poll::Ready(Ok(this.position)) > + } > +} > + > +impl AsyncRead for SeekableCachedChunkReader > +where > + I: IndexFile + Send + Sync + 'static, > + R: AsyncReadChunk + Send + Sync + 'static, > +{ > + fn poll_read( > + self: Pin<&mut Self>, > + cx: &mut Context, > + buf: &mut ReadBuf, > + ) -> Poll> { > + let this = Pin::get_mut(self); > + > + let fut = match this.read_future { > + Some(ref mut fut) => fut, > + None => { > + let offset = this.position; > + let wanted = buf.capacity(); > + let reader = Arc::clone(&this.reader); > + let fut = Box::pin(async move { > + let mut read_buf = vec![0u8; wanted]; > + let read = reader.read_at(&mut read_buf[..wanted], offset).await?; > + Ok((read_buf, read)) > + }); > + this.read_future = Some(fut); > + this.read_future.as_mut().unwrap() > + } > + }; Your `None` case seems trivial enough that you could use the Option's `.get_or_insert_with()` instead of match with `ref mut` and `.as_mut().unwrap()` (since the `None` case has no error cases) > + > + let ret = match ready!(fut.as_mut().poll(cx)) { > + Ok((read_buf, read)) => { > + buf.put_slice(&read_buf[..read]); > + this.position += read as u64; > + Ok(()) > + } > + Err(err) => { > + Err(io_err_other(err)) > + } > + }; > + > + // future completed, drop > + let _drop = this.read_future.take(); Why not just `this.read_future = None;` ? > + > + Poll::Ready(ret) > + } > +} > -- > 2.30.2