From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 790BB74A37; Wed, 2 Jun 2021 16:39:18 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0774DD14F; Wed, 2 Jun 2021 16:38:48 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 81639C956; Wed, 2 Jun 2021 16:38:44 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 48D07466D8; Wed, 2 Jun 2021 16:38:44 +0200 (CEST) From: Stefan Reiter To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com Date: Wed, 2 Jun 2021 16:38:28 +0200 Message-Id: <20210602143833.4423-5-s.reiter@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com> References: <20210602143833.4423-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.029 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 02 Jun 2021 14:39:18 -0000 Implemented as a seperate struct SeekableCachedChunkReader that contains the original as an Arc, since the read_at future captures the CachedChunkReader, which would otherwise not work with the lifetimes required by AsyncRead. This is also the reason we cannot use a shared read buffer and have to allocate a new one for every read. It also means that the struct items required for AsyncRead/Seek do not need to be included in a regular CachedChunkReader. This is intended as a replacement for AsyncIndexReader, so we have less code duplication and can utilize the LRU cache there too (even though actual request concurrency is not supported in these traits). Signed-off-by: Stefan Reiter --- src/backup/cached_chunk_reader.rs | 116 +++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 2 deletions(-) diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs index fd5a049f..9b56fd14 100644 --- a/src/backup/cached_chunk_reader.rs +++ b/src/backup/cached_chunk_reader.rs @@ -1,12 +1,19 @@ //! An async and concurrency safe data reader backed by a local LRU cache. use anyhow::Error; +use futures::future::Future; +use futures::ready; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -use std::future::Future; +use std::io::SeekFrom; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::backup::{AsyncReadChunk, IndexFile}; +use super::{AsyncReadChunk, IndexFile}; use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; +use proxmox::io_format_err; +use proxmox::sys::error::io_err_other; struct AsyncChunkCacher { reader: Arc, @@ -85,3 +92,108 @@ impl CachedChunkReader< Ok(read) } } + +impl + CachedChunkReader +{ + /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and + /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred + /// otherwise. + pub fn seekable(self) -> SeekableCachedChunkReader { + SeekableCachedChunkReader { + index_bytes: self.index.index_bytes(), + reader: Arc::new(self), + position: 0, + seek_to_pos: 0, + read_future: None, + } + } +} + +pub struct SeekableCachedChunkReader< + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +> { + reader: Arc>, + index_bytes: u64, + position: u64, + seek_to_pos: i64, + read_future: Option, usize), Error>> + Send>>>, +} + +impl AsyncSeek for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { + let this = Pin::get_mut(self); + this.seek_to_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => this.index_bytes as i64 + offset, + SeekFrom::Current(offset) => this.position as i64 + offset, + }; + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = Pin::get_mut(self); + + let index_bytes = this.index_bytes; + if this.seek_to_pos < 0 { + return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); + } else if this.seek_to_pos > index_bytes as i64 { + this.position = index_bytes; + } else { + this.position = this.seek_to_pos as u64; + } + + Poll::Ready(Ok(this.position)) + } +} + +impl AsyncRead for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf, + ) -> Poll> { + let this = Pin::get_mut(self); + + let fut = match this.read_future { + Some(ref mut fut) => fut, + None => { + let offset = this.position; + let wanted = buf.capacity(); + let reader = Arc::clone(&this.reader); + let fut = Box::pin(async move { + let mut read_buf = vec![0u8; wanted]; + let read = reader.read_at(&mut read_buf[..wanted], offset).await?; + Ok((read_buf, read)) + }); + this.read_future = Some(fut); + this.read_future.as_mut().unwrap() + } + }; + + let ret = match ready!(fut.as_mut().poll(cx)) { + Ok((read_buf, read)) => { + buf.put_slice(&read_buf[..read]); + this.position += read as u64; + Ok(()) + } + Err(err) => { + Err(io_err_other(err)) + } + }; + + // future completed, drop + let _drop = this.read_future.take(); + + Poll::Ready(ret) + } +} -- 2.30.2