From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 17D8965390 for ; Wed, 22 Jul 2020 15:57:08 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 085B8210BE for ; Wed, 22 Jul 2020 15:56:38 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 3212421096 for ; Wed, 22 Jul 2020 15:56:35 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id E945D431A8 for ; Wed, 22 Jul 2020 15:56:34 +0200 (CEST) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Wed, 22 Jul 2020 15:56:22 +0200 Message-Id: <20200722135625.23653-3-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200722135625.23653-1-s.reiter@proxmox.com> References: <20200722135625.23653-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [index.rs, this.store] Subject: [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 22 Jul 2020 13:57:08 -0000 Requires updating the AsyncRead implementation to cope with byte-wise seeks to intra-chunk positions. Uses chunk_from_offset to get locations within chunks, but tries to avoid it for sequential read to not reduce performance from before. AsyncSeek needs to use the temporary seek_to_pos to avoid changing the position in case an invalid seek is given and it needs to error in poll_complete. Signed-off-by: Stefan Reiter --- src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------ src/backup/index.rs | 1 + 2 files changed, 97 insertions(+), 20 deletions(-) diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs index 0911375e..98372aa1 100644 --- a/src/backup/async_index_reader.rs +++ b/src/backup/async_index_reader.rs @@ -1,30 +1,35 @@ use std::future::Future; use std::task::{Poll, Context}; use std::pin::Pin; +use std::io::SeekFrom; use anyhow::Error; use futures::future::FutureExt; use futures::ready; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, AsyncSeek}; use proxmox::sys::error::io_err_other; use proxmox::io_format_err; use super::IndexFile; use super::read_chunk::AsyncReadChunk; +use super::index::ChunkReadInfo; enum AsyncIndexReaderState { NoData, WaitForData(Pin), Error>> + Send + 'static>>), - HaveData(usize), + HaveData, } pub struct AsyncIndexReader { store: Option, index: I, read_buffer: Vec, + current_chunk_offset: u64, current_chunk_idx: usize, - current_chunk_digest: [u8; 32], + current_chunk_info: Option, + position: u64, + seek_to_pos: i64, state: AsyncIndexReaderState, } @@ -37,8 +42,11 @@ impl AsyncIndexReader { store: Some(store), index, read_buffer: Vec::with_capacity(1024 * 1024), + current_chunk_offset: 0, current_chunk_idx: 0, - current_chunk_digest: [0u8; 32], + current_chunk_info: None, + position: 0, + seek_to_pos: 0, state: AsyncIndexReaderState::NoData, } } @@ -58,23 +66,41 @@ where loop { match &mut this.state { AsyncIndexReaderState::NoData => { - if this.current_chunk_idx >= this.index.index_count() { + let (idx, offset) = if this.current_chunk_info.is_some() && + this.position == this.current_chunk_info.as_ref().unwrap().range.end + { + // optimization for sequential chunk read + let next_idx = this.current_chunk_idx + 1; + (next_idx, 0) + } else { + match this.index.chunk_from_offset(this.position) { + Some(res) => res, + None => return Poll::Ready(Ok(0)) + } + }; + + if idx >= this.index.index_count() { return Poll::Ready(Ok(0)); } - let digest = this + let info = this .index - .index_digest(this.current_chunk_idx) - .ok_or(io_format_err!("could not get digest"))? - .clone(); + .chunk_info(idx) + .ok_or(io_format_err!("could not get digest"))?; - if digest == this.current_chunk_digest { - this.state = AsyncIndexReaderState::HaveData(0); - continue; + this.current_chunk_offset = offset; + this.current_chunk_idx = idx; + let old_info = this.current_chunk_info.replace(info.clone()); + + if let Some(old_info) = old_info { + if old_info.digest == info.digest { + // hit, chunk is currently in cache + this.state = AsyncIndexReaderState::HaveData; + continue; + } } - this.current_chunk_digest = digest; - + // miss, need to download new chunk let store = match this.store.take() { Some(store) => store, None => { @@ -83,7 +109,7 @@ where }; let future = async move { - store.read_chunk(&digest) + store.read_chunk(&info.digest) .await .map(move |x| (store, x)) }; @@ -95,7 +121,7 @@ where Ok((store, mut chunk_data)) => { this.read_buffer.clear(); this.read_buffer.append(&mut chunk_data); - this.state = AsyncIndexReaderState::HaveData(0); + this.state = AsyncIndexReaderState::HaveData; this.store = Some(store); } Err(err) => { @@ -103,8 +129,8 @@ where } }; } - AsyncIndexReaderState::HaveData(offset) => { - let offset = *offset; + AsyncIndexReaderState::HaveData => { + let offset = this.current_chunk_offset as usize; let len = this.read_buffer.len(); let n = if len - offset < buf.len() { len - offset @@ -113,11 +139,13 @@ where }; buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]); + this.position += n as u64; + if offset + n == len { this.state = AsyncIndexReaderState::NoData; - this.current_chunk_idx += 1; } else { - this.state = AsyncIndexReaderState::HaveData(offset + n); + this.current_chunk_offset += n as u64; + this.state = AsyncIndexReaderState::HaveData; } return Poll::Ready(Ok(n)); @@ -126,3 +154,51 @@ where } } } + +impl AsyncSeek for AsyncIndexReader +where + S: AsyncReadChunk + Unpin + Sync + 'static, + I: IndexFile + Unpin, +{ + fn start_seek( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let this = Pin::get_mut(self); + this.seek_to_pos = match pos { + SeekFrom::Start(offset) => { + offset as i64 + }, + SeekFrom::End(offset) => { + this.index.index_bytes() as i64 + offset + }, + SeekFrom::Current(offset) => { + this.position as i64 + offset + } + }; + Poll::Ready(Ok(())) + } + + fn poll_complete( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + let this = Pin::get_mut(self); + + let index_bytes = this.index.index_bytes(); + if this.seek_to_pos < 0 { + return Poll::Ready(Err(io_format_err!("cannot seek to negative values"))); + } else if this.seek_to_pos > index_bytes as i64 { + this.position = index_bytes; + } else { + this.position = this.seek_to_pos as u64; + } + + // even if seeking within one chunk, we need to go to NoData to + // recalculate the current_chunk_offset (data is cached anyway) + this.state = AsyncIndexReaderState::NoData; + + Poll::Ready(Ok(this.position)) + } +} diff --git a/src/backup/index.rs b/src/backup/index.rs index 2eab8524..c6bab56e 100644 --- a/src/backup/index.rs +++ b/src/backup/index.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::ops::Range; +#[derive(Clone)] pub struct ChunkReadInfo { pub range: Range, pub digest: [u8; 32], -- 2.20.1