From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 83D6970964; Mon, 7 Jun 2021 17:36:26 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C4EF012F0D; Mon, 7 Jun 2021 17:35:55 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id E482312D65; Mon, 7 Jun 2021 17:35:49 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B01BB42DEA; Mon, 7 Jun 2021 17:35:49 +0200 (CEST) From: Stefan Reiter To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com Date: Mon, 7 Jun 2021 17:35:29 +0200 Message-Id: <20210607153532.2522267-7-s.reiter@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com> References: <20210607153532.2522267-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.806 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader X-BeenThere: pve-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox VE development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 07 Jun 2021 15:36:26 -0000 superseded by CachedChunkReader, with less code and more speed Signed-off-by: Stefan Reiter --- src/backup.rs | 3 - src/backup/async_index_reader.rs | 215 ------------------------------- 2 files changed, 218 deletions(-) delete mode 100644 src/backup/async_index_reader.rs diff --git a/src/backup.rs b/src/backup.rs index 5e1147b4..7bf29a5a 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -257,8 +257,5 @@ pub use verify::*; mod catalog_shell; pub use catalog_shell::*; -mod async_index_reader; -pub use async_index_reader::*; - mod cached_chunk_reader; pub use cached_chunk_reader::*; diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs deleted file mode 100644 index 20a37e7e..00000000 --- a/src/backup/async_index_reader.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::future::Future; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::io::SeekFrom; - -use anyhow::Error; -use futures::future::FutureExt; -use futures::ready; -use tokio::io::{AsyncRead, AsyncSeek, ReadBuf}; - -use proxmox::sys::error::io_err_other; -use proxmox::io_format_err; - -use super::IndexFile; -use super::read_chunk::AsyncReadChunk; -use super::index::ChunkReadInfo; - -type ReadFuture = dyn Future), Error>> + Send + 'static; - -// FIXME: This enum may not be required? -// - Put the `WaitForData` case directly into a `read_future: Option<>` -// - make the read loop as follows: -// * if read_buffer is not empty: -// use it -// * else if read_future is there: -// poll it -// if read: move data to read_buffer -// * else -// create read future -#[allow(clippy::enum_variant_names)] -enum AsyncIndexReaderState { - NoData, - WaitForData(Pin>>), - HaveData, -} - -pub struct AsyncIndexReader { - store: Option, - index: I, - read_buffer: Vec, - current_chunk_offset: u64, - current_chunk_idx: usize, - current_chunk_info: Option, - position: u64, - seek_to_pos: i64, - state: AsyncIndexReaderState, -} - -// ok because the only public interfaces operates on &mut Self -unsafe impl Sync for AsyncIndexReader {} - -impl AsyncIndexReader { - pub fn new(index: I, store: S) -> Self { - Self { - store: Some(store), - index, - read_buffer: Vec::with_capacity(1024 * 1024), - current_chunk_offset: 0, - current_chunk_idx: 0, - current_chunk_info: None, - position: 0, - seek_to_pos: 0, - state: AsyncIndexReaderState::NoData, - } - } -} - -impl AsyncRead for AsyncIndexReader -where - S: AsyncReadChunk + Unpin + Sync + 'static, - I: IndexFile + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut ReadBuf, - ) -> Poll> { - let this = Pin::get_mut(self); - loop { - match &mut this.state { - AsyncIndexReaderState::NoData => { - let (idx, offset) = if this.current_chunk_info.is_some() && - this.position == this.current_chunk_info.as_ref().unwrap().range.end - { - // optimization for sequential chunk read - let next_idx = this.current_chunk_idx + 1; - (next_idx, 0) - } else { - match this.index.chunk_from_offset(this.position) { - Some(res) => res, - None => return Poll::Ready(Ok(())) - } - }; - - if idx >= this.index.index_count() { - return Poll::Ready(Ok(())); - } - - let info = this - .index - .chunk_info(idx) - .ok_or_else(|| io_format_err!("could not get digest"))?; - - this.current_chunk_offset = offset; - this.current_chunk_idx = idx; - let old_info = this.current_chunk_info.replace(info.clone()); - - if let Some(old_info) = old_info { - if old_info.digest == info.digest { - // hit, chunk is currently in cache - this.state = AsyncIndexReaderState::HaveData; - continue; - } - } - - // miss, need to download new chunk - let store = match this.store.take() { - Some(store) => store, - None => { - return Poll::Ready(Err(io_format_err!("could not find store"))); - } - }; - - let future = async move { - store.read_chunk(&info.digest) - .await - .map(move |x| (store, x)) - }; - - this.state = AsyncIndexReaderState::WaitForData(future.boxed()); - } - AsyncIndexReaderState::WaitForData(ref mut future) => { - match ready!(future.as_mut().poll(cx)) { - Ok((store, chunk_data)) => { - this.read_buffer = chunk_data; - this.state = AsyncIndexReaderState::HaveData; - this.store = Some(store); - } - Err(err) => { - return Poll::Ready(Err(io_err_other(err))); - } - }; - } - AsyncIndexReaderState::HaveData => { - let offset = this.current_chunk_offset as usize; - let len = this.read_buffer.len(); - let n = if len - offset < buf.remaining() { - len - offset - } else { - buf.remaining() - }; - - buf.put_slice(&this.read_buffer[offset..(offset + n)]); - this.position += n as u64; - - if offset + n == len { - this.state = AsyncIndexReaderState::NoData; - } else { - this.current_chunk_offset += n as u64; - this.state = AsyncIndexReaderState::HaveData; - } - - return Poll::Ready(Ok(())); - } - } - } - } -} - -impl AsyncSeek for AsyncIndexReader -where - S: AsyncReadChunk + Unpin + Sync + 'static, - I: IndexFile + Unpin, -{ - fn start_seek( - self: Pin<&mut Self>, - pos: SeekFrom, - ) -> tokio::io::Result<()> { - let this = Pin::get_mut(self); - this.seek_to_pos = match pos { - SeekFrom::Start(offset) => { - offset as i64 - }, - SeekFrom::End(offset) => { - this.index.index_bytes() as i64 + offset - }, - SeekFrom::Current(offset) => { - this.position as i64 + offset - } - }; - Ok(()) - } - - fn poll_complete( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - let this = Pin::get_mut(self); - - let index_bytes = this.index.index_bytes(); - if this.seek_to_pos < 0 { - return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); - } else if this.seek_to_pos > index_bytes as i64 { - this.position = index_bytes; - } else { - this.position = this.seek_to_pos as u64; - } - - // even if seeking within one chunk, we need to go to NoData to - // recalculate the current_chunk_offset (data is cached anyway) - this.state = AsyncIndexReaderState::NoData; - - Poll::Ready(Ok(this.position)) - } -} -- 2.30.2