From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 492A264DA8 for ; Tue, 21 Jul 2020 11:38:15 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3804516BB9 for ; Tue, 21 Jul 2020 11:37:45 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id A457816BAB for ; Tue, 21 Jul 2020 11:37:43 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 6946E432CE for ; Tue, 21 Jul 2020 11:37:43 +0200 (CEST) To: pbs-devel@lists.proxmox.com References: <20200720150220.22996-1-s.reiter@proxmox.com> <20200720150220.22996-2-s.reiter@proxmox.com> From: Dominik Csapak Message-ID: Date: Tue, 21 Jul 2020 11:37:41 +0200 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:79.0) Gecko/20100101 Thunderbird/79.0 MIME-Version: 1.0 In-Reply-To: <20200720150220.22996-2-s.reiter@proxmox.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.701 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment KAM_SHORT 0.001 Use of a URL Shortener for very short URL NICE_REPLY_A -0.001 Looks like a legit reply (A) RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [docs.rs] Subject: Re: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 21 Jul 2020 09:38:15 -0000 On 7/20/20 5:02 PM, Stefan Reiter wrote: > It's currently only used in proxmox-backup-qemu, so no users in this > package need to be changed. > > This also allows simplifying the interface to just what is needed in > proxmox-backup-qemu, i.e. by making it async we need to remove the > std::io::Read trait, and that allows to get rid of the seeking > workaround. > > Cache locking is done with a reader-writer lock, though multiple > requests for one non-cached chunk are still possible (cannot hold a lock > during async HTTP request) - though in testing, performance did not > regress. > > The "sequential read" optimization is removed, since it didn't serve any > purpose AFAICT. chunk_end was not used anywhere else (gave a warning). > > Signed-off-by: Stefan Reiter > --- > > I'm unsure about the approach here, happy to hear feedback from anyone more > familiar with Rust. I couldn't find a way to combine the "read" trait impl with > async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only > user I don't think this is an issue, but I don't know if this interface was put > here deliberately, even if unused elsewhere. high level comment: maybe it would be nicer to implement AsyncSeek[0] for our existing AsyncIndexReader (wich also buffers a single chunk, but works for fixed + dynamic index). then we could simply keep the behaviour we have in proxmox-backup-qemu (just add '.await') for implementing this, we would have to extend the Index trait to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx, offset_in_chunk) since a 'seek' would then be 'instant' (at least there is nothing we could await), we would just have a 'psuedo-async' interface to implement it would then 'magically' work also for a dynamic index alternatively, we could also implement the 'async_buffered_read' simply for our AsyncIndexReader also and still drop this here entirely more comments inline 0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html > > src/backup/fixed_index.rs | 145 ++++++++++---------------------------- > 1 file changed, 37 insertions(+), 108 deletions(-) > > diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs > index 73d0dad0..3f5fde2a 100644 > --- a/src/backup/fixed_index.rs > +++ b/src/backup/fixed_index.rs > @@ -1,5 +1,6 @@ > use anyhow::{bail, format_err, Error}; > use std::io::{Seek, SeekFrom}; > +use std::cmp::min; no need to use this see comment further below > > use super::chunk_stat::*; > use super::chunk_store::*; > @@ -11,7 +12,7 @@ use std::fs::File; > use std::io::Write; > use std::os::unix::io::AsRawFd; > use std::path::{Path, PathBuf}; > -use std::sync::Arc; > +use std::sync::{Arc,RwLock}; > > use super::read_chunk::*; > use super::ChunkInfo; > @@ -146,20 +147,6 @@ impl FixedIndexReader { > Ok(()) > } > > - #[inline] > - fn chunk_end(&self, pos: usize) -> u64 { > - if pos >= self.index_length { > - panic!("chunk index out of range"); > - } > - > - let end = ((pos + 1) * self.chunk_size) as u64; > - if end > self.size { > - self.size > - } else { > - end > - } > - } > - > pub fn print_info(&self) { > println!("Size: {}", self.size); > println!("ChunkSize: {}", self.chunk_size); > @@ -466,27 +453,29 @@ impl FixedIndexWriter { > } > } > > +struct ChunkBuffer { > + data: Vec, > + chunk_idx: usize, > +} > + > pub struct BufferedFixedReader { > store: S, > index: FixedIndexReader, > archive_size: u64, > - read_buffer: Vec, > - buffered_chunk_idx: usize, > - buffered_chunk_start: u64, > - read_offset: u64, > + buffer: RwLock, > } > > -impl BufferedFixedReader { > +impl BufferedFixedReader { > pub fn new(index: FixedIndexReader, store: S) -> Self { > let archive_size = index.size; > Self { > store, > index, > archive_size, > - read_buffer: Vec::with_capacity(1024 * 1024), > - buffered_chunk_idx: 0, > - buffered_chunk_start: 0, > - read_offset: 0, > + buffer: RwLock::new(ChunkBuffer { > + data: Vec::with_capacity(1024 * 1024 * 4), > + chunk_idx: 0, > + }), > } > } > > @@ -494,7 +483,7 @@ impl BufferedFixedReader { > self.archive_size > } > > - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { > + async fn buffer_chunk(&self, idx: usize) -> Result, Error> { > let index = &self.index; > let info = match index.chunk_info(idx) { > Some(info) => info, > @@ -503,104 +492,44 @@ impl BufferedFixedReader { > > // fixme: avoid copy > > - let data = self.store.read_chunk(&info.digest)?; > + let data = self.store.read_chunk(&info.digest).await?; > let size = info.range.end - info.range.start; > if size != data.len() as u64 { > bail!("read chunk with wrong size ({} != {}", size, data.len()); > } > > - self.read_buffer.clear(); > - self.read_buffer.extend_from_slice(&data); > + let mut buffer = self.buffer.write().unwrap(); > + buffer.data.clear(); > + buffer.data.extend_from_slice(&data); > + buffer.chunk_idx = idx; > > - self.buffered_chunk_idx = idx; > - > - self.buffered_chunk_start = info.range.start as u64; > - Ok(()) > + Ok(data) > } > -} > > -impl crate::tools::BufferedRead for BufferedFixedReader { > - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { > + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result { > if offset == self.archive_size { > - return Ok(&self.read_buffer[0..0]); > + return Ok(0); > } > > - let buffer_len = self.read_buffer.len(); > - let index = &self.index; > + let idx = (offset / self.index.chunk_size as u64) as usize; > + > + let mut copy_to_buf = move |from: &Vec| -> u64 { > + let buffer_offset = (offset % self.index.chunk_size as u64) as usize; > + let data_len = min(from.len() - buffer_offset, size); instead of using std::cmp::min, you can simply use min on a number: let data_len = size.min(from.len() - buffer_offset); > + unsafe { > + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len); > + } here you can avoid unsafe code by writing: buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]); (does internally exactly the same, but checks the lengths and panics in error case instead of corrupting memory/segfaulting) > + data_len as _ > + }; > > - // optimization for sequential read > - if buffer_len > 0 > - && ((self.buffered_chunk_idx + 1) < index.index_length) > - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) > { > - let next_idx = self.buffered_chunk_idx + 1; > - let next_end = index.chunk_end(next_idx); > - if offset < next_end { > - self.buffer_chunk(next_idx)?; > - let buffer_offset = (offset - self.buffered_chunk_start) as usize; > - return Ok(&self.read_buffer[buffer_offset..]); > + let buffer = self.buffer.read().unwrap(); > + if buffer.data.len() != 0 && buffer.chunk_idx == idx { > + return Ok(copy_to_buf(&buffer.data)); > } > } > > - if (buffer_len == 0) > - || (offset < self.buffered_chunk_start) > - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) > - { > - let idx = (offset / index.chunk_size as u64) as usize; > - self.buffer_chunk(idx)?; > - } > - > - let buffer_offset = (offset - self.buffered_chunk_start) as usize; > - Ok(&self.read_buffer[buffer_offset..]) > - } > -} > - > -impl std::io::Read for BufferedFixedReader { > - fn read(&mut self, buf: &mut [u8]) -> Result { > - use crate::tools::BufferedRead; > - use std::io::{Error, ErrorKind}; > - > - let data = match self.buffered_read(self.read_offset) { > - Ok(v) => v, > - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), > - }; > - > - let n = if data.len() > buf.len() { > - buf.len() > - } else { > - data.len() > - }; > - > - unsafe { > - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); > - } > - > - self.read_offset += n as u64; > - > - Ok(n) > - } > -} > - > -impl Seek for BufferedFixedReader { > - fn seek(&mut self, pos: SeekFrom) -> Result { > - let new_offset = match pos { > - SeekFrom::Start(start_offset) => start_offset as i64, > - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, > - SeekFrom::Current(offset) => (self.read_offset as i64) + offset, > - }; > - > - use std::io::{Error, ErrorKind}; > - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { > - return Err(Error::new( > - ErrorKind::Other, > - format!( > - "seek is out of range {} ([0..{}])", > - new_offset, self.archive_size > - ), > - )); > - } > - self.read_offset = new_offset as u64; > - > - Ok(self.read_offset) > + let new_data = self.buffer_chunk(idx).await?; > + Ok(copy_to_buf(&new_data)) > } > } >