From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 98CBA64779 for ; Mon, 20 Jul 2020 17:02:28 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 93224F4E3 for ; Mon, 20 Jul 2020 17:02:28 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id BC123F4D7 for ; Mon, 20 Jul 2020 17:02:26 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 84F64431A1 for ; Mon, 20 Jul 2020 17:02:26 +0200 (CEST) From: Stefan Reiter To: pbs-devel@lists.proxmox.com Date: Mon, 20 Jul 2020 17:02:18 +0200 Message-Id: <20200720150220.22996-2-s.reiter@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200720150220.22996-1-s.reiter@proxmox.com> References: <20200720150220.22996-1-s.reiter@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.024 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 20 Jul 2020 15:02:28 -0000 It's currently only used in proxmox-backup-qemu, so no users in this package need to be changed. This also allows simplifying the interface to just what is needed in proxmox-backup-qemu, i.e. by making it async we need to remove the std::io::Read trait, and that allows to get rid of the seeking workaround. Cache locking is done with a reader-writer lock, though multiple requests for one non-cached chunk are still possible (cannot hold a lock during async HTTP request) - though in testing, performance did not regress. The "sequential read" optimization is removed, since it didn't serve any purpose AFAICT. chunk_end was not used anywhere else (gave a warning). Signed-off-by: Stefan Reiter --- I'm unsure about the approach here, happy to hear feedback from anyone more familiar with Rust. I couldn't find a way to combine the "read" trait impl with async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only user I don't think this is an issue, but I don't know if this interface was put here deliberately, even if unused elsewhere. src/backup/fixed_index.rs | 145 ++++++++++---------------------------- 1 file changed, 37 insertions(+), 108 deletions(-) diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs index 73d0dad0..3f5fde2a 100644 --- a/src/backup/fixed_index.rs +++ b/src/backup/fixed_index.rs @@ -1,5 +1,6 @@ use anyhow::{bail, format_err, Error}; use std::io::{Seek, SeekFrom}; +use std::cmp::min; use super::chunk_stat::*; use super::chunk_store::*; @@ -11,7 +12,7 @@ use std::fs::File; use std::io::Write; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc,RwLock}; use super::read_chunk::*; use super::ChunkInfo; @@ -146,20 +147,6 @@ impl FixedIndexReader { Ok(()) } - #[inline] - fn chunk_end(&self, pos: usize) -> u64 { - if pos >= self.index_length { - panic!("chunk index out of range"); - } - - let end = ((pos + 1) * self.chunk_size) as u64; - if end > self.size { - self.size - } else { - end - } - } - pub fn print_info(&self) { println!("Size: {}", self.size); println!("ChunkSize: {}", self.chunk_size); @@ -466,27 +453,29 @@ impl FixedIndexWriter { } } +struct ChunkBuffer { + data: Vec, + chunk_idx: usize, +} + pub struct BufferedFixedReader { store: S, index: FixedIndexReader, archive_size: u64, - read_buffer: Vec, - buffered_chunk_idx: usize, - buffered_chunk_start: u64, - read_offset: u64, + buffer: RwLock, } -impl BufferedFixedReader { +impl BufferedFixedReader { pub fn new(index: FixedIndexReader, store: S) -> Self { let archive_size = index.size; Self { store, index, archive_size, - read_buffer: Vec::with_capacity(1024 * 1024), - buffered_chunk_idx: 0, - buffered_chunk_start: 0, - read_offset: 0, + buffer: RwLock::new(ChunkBuffer { + data: Vec::with_capacity(1024 * 1024 * 4), + chunk_idx: 0, + }), } } @@ -494,7 +483,7 @@ impl BufferedFixedReader { self.archive_size } - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> { + async fn buffer_chunk(&self, idx: usize) -> Result, Error> { let index = &self.index; let info = match index.chunk_info(idx) { Some(info) => info, @@ -503,104 +492,44 @@ impl BufferedFixedReader { // fixme: avoid copy - let data = self.store.read_chunk(&info.digest)?; + let data = self.store.read_chunk(&info.digest).await?; let size = info.range.end - info.range.start; if size != data.len() as u64 { bail!("read chunk with wrong size ({} != {}", size, data.len()); } - self.read_buffer.clear(); - self.read_buffer.extend_from_slice(&data); + let mut buffer = self.buffer.write().unwrap(); + buffer.data.clear(); + buffer.data.extend_from_slice(&data); + buffer.chunk_idx = idx; - self.buffered_chunk_idx = idx; - - self.buffered_chunk_start = info.range.start as u64; - Ok(()) + Ok(data) } -} -impl crate::tools::BufferedRead for BufferedFixedReader { - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> { + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result { if offset == self.archive_size { - return Ok(&self.read_buffer[0..0]); + return Ok(0); } - let buffer_len = self.read_buffer.len(); - let index = &self.index; + let idx = (offset / self.index.chunk_size as u64) as usize; + + let mut copy_to_buf = move |from: &Vec| -> u64 { + let buffer_offset = (offset % self.index.chunk_size as u64) as usize; + let data_len = min(from.len() - buffer_offset, size); + unsafe { + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len); + } + data_len as _ + }; - // optimization for sequential read - if buffer_len > 0 - && ((self.buffered_chunk_idx + 1) < index.index_length) - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) { - let next_idx = self.buffered_chunk_idx + 1; - let next_end = index.chunk_end(next_idx); - if offset < next_end { - self.buffer_chunk(next_idx)?; - let buffer_offset = (offset - self.buffered_chunk_start) as usize; - return Ok(&self.read_buffer[buffer_offset..]); + let buffer = self.buffer.read().unwrap(); + if buffer.data.len() != 0 && buffer.chunk_idx == idx { + return Ok(copy_to_buf(&buffer.data)); } } - if (buffer_len == 0) - || (offset < self.buffered_chunk_start) - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64))) - { - let idx = (offset / index.chunk_size as u64) as usize; - self.buffer_chunk(idx)?; - } - - let buffer_offset = (offset - self.buffered_chunk_start) as usize; - Ok(&self.read_buffer[buffer_offset..]) - } -} - -impl std::io::Read for BufferedFixedReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - use crate::tools::BufferedRead; - use std::io::{Error, ErrorKind}; - - let data = match self.buffered_read(self.read_offset) { - Ok(v) => v, - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())), - }; - - let n = if data.len() > buf.len() { - buf.len() - } else { - data.len() - }; - - unsafe { - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n); - } - - self.read_offset += n as u64; - - Ok(n) - } -} - -impl Seek for BufferedFixedReader { - fn seek(&mut self, pos: SeekFrom) -> Result { - let new_offset = match pos { - SeekFrom::Start(start_offset) => start_offset as i64, - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset, - SeekFrom::Current(offset) => (self.read_offset as i64) + offset, - }; - - use std::io::{Error, ErrorKind}; - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) { - return Err(Error::new( - ErrorKind::Other, - format!( - "seek is out of range {} ([0..{}])", - new_offset, self.archive_size - ), - )); - } - self.read_offset = new_offset as u64; - - Ok(self.read_offset) + let new_data = self.buffer_chunk(idx).await?; + Ok(copy_to_buf(&new_data)) } } -- 2.20.1