From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 2260E707DC; Mon, 7 Jun 2021 17:35:52 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 114E212D92; Mon, 7 Jun 2021 17:35:52 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 24C8112D43; Mon, 7 Jun 2021 17:35:46 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id F04EA4652A; Mon, 7 Jun 2021 17:35:45 +0200 (CEST) From: Stefan Reiter To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com Date: Mon, 7 Jun 2021 17:35:27 +0200 Message-Id: <20210607153532.2522267-5-s.reiter@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com> References: <20210607153532.2522267-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.899 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 07 Jun 2021 15:35:52 -0000 Implemented as a seperate struct SeekableCachedChunkReader that contains the original as an Arc, since the read_at future captures the CachedChunkReader, which would otherwise not work with the lifetimes required by AsyncRead. This is also the reason we cannot use a shared read buffer and have to allocate a new one for every read. It also means that the struct items required for AsyncRead/Seek do not need to be included in a regular CachedChunkReader. This is intended as a replacement for AsyncIndexReader, so we have less code duplication and can utilize the LRU cache there too (even though actual request concurrency is not supported in these traits). Signed-off-by: Stefan Reiter --- v2: * drop 'seek_to_pos' from struct and implement error handling directly in start_seek * simplify future handling in poll_read with Option::get_or_insert_with src/backup/cached_chunk_reader.rs | 104 +++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs index ff476e37..c9ca4773 100644 --- a/src/backup/cached_chunk_reader.rs +++ b/src/backup/cached_chunk_reader.rs @@ -1,12 +1,19 @@ //! An async and concurrency safe data reader backed by a local LRU cache. use anyhow::Error; +use futures::future::Future; +use futures::ready; +use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; -use std::future::Future; +use std::io::SeekFrom; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::backup::{AsyncReadChunk, IndexFile}; +use super::{AsyncReadChunk, IndexFile}; use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache}; +use proxmox::io_format_err; +use proxmox::sys::error::io_err_other; struct AsyncChunkCacher { reader: Arc, @@ -87,3 +94,96 @@ impl CachedChunkReader< Ok(read) } } + +impl + CachedChunkReader +{ + /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and + /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred + /// otherwise. + pub fn seekable(self) -> SeekableCachedChunkReader { + SeekableCachedChunkReader { + index_bytes: self.index.index_bytes(), + reader: Arc::new(self), + position: 0, + read_future: None, + } + } +} + +pub struct SeekableCachedChunkReader< + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +> { + reader: Arc>, + index_bytes: u64, + position: u64, + read_future: Option, usize), Error>> + Send>>>, +} + +impl AsyncSeek for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> { + let this = Pin::get_mut(self); + let seek_to_pos = match pos { + SeekFrom::Start(offset) => offset as i64, + SeekFrom::End(offset) => this.index_bytes as i64 + offset, + SeekFrom::Current(offset) => this.position as i64 + offset, + }; + if seek_to_pos < 0 { + return Err(io_format_err!("cannot seek to negative values")); + } else if seek_to_pos > this.index_bytes as i64 { + this.position = this.index_bytes; + } else { + this.position = seek_to_pos as u64; + } + Ok(()) + } + + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(self.position)) + } +} + +impl AsyncRead for SeekableCachedChunkReader +where + I: IndexFile + Send + Sync + 'static, + R: AsyncReadChunk + Send + Sync + 'static, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut ReadBuf, + ) -> Poll> { + let this = Pin::get_mut(self); + + let offset = this.position; + let wanted = buf.capacity(); + let reader = Arc::clone(&this.reader); + + let fut = this.read_future.get_or_insert_with(|| { + Box::pin(async move { + let mut read_buf = vec![0u8; wanted]; + let read = reader.read_at(&mut read_buf[..wanted], offset).await?; + Ok((read_buf, read)) + }) + }); + + let ret = match ready!(fut.as_mut().poll(cx)) { + Ok((read_buf, read)) => { + buf.put_slice(&read_buf[..read]); + this.position += read as u64; + Ok(()) + } + Err(err) => Err(io_err_other(err)), + }; + + // future completed, drop + this.read_future = None; + + Poll::Ready(ret) + } +} -- 2.30.2