all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: Re: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
Date: Tue, 21 Jul 2020 11:37:41 +0200	[thread overview]
Message-ID: <d684e6ed-5c64-2a78-70ba-d3fb9dfb4bbf@proxmox.com> (raw)
In-Reply-To: <20200720150220.22996-2-s.reiter@proxmox.com>

On 7/20/20 5:02 PM, Stefan Reiter wrote:
> It's currently only used in proxmox-backup-qemu, so no users in this
> package need to be changed.
> 
> This also allows simplifying the interface to just what is needed in
> proxmox-backup-qemu, i.e. by making it async we need to remove the
> std::io::Read trait, and that allows to get rid of the seeking
> workaround.
> 
> Cache locking is done with a reader-writer lock, though multiple
> requests for one non-cached chunk are still possible (cannot hold a lock
> during async HTTP request) - though in testing, performance did not
> regress.
> 
> The "sequential read" optimization is removed, since it didn't serve any
> purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
> 
> I'm unsure about the approach here, happy to hear feedback from anyone more
> familiar with Rust. I couldn't find a way to combine the "read" trait impl with
> async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
> user I don't think this is an issue, but I don't know if this interface was put
> here deliberately, even if unused elsewhere.

high level comment:

maybe it would be nicer to implement AsyncSeek[0] for our existing
AsyncIndexReader (wich also buffers a single chunk, but works
for fixed + dynamic index). then we could simply keep the
behaviour we have in proxmox-backup-qemu (just add '.await')

for implementing this, we would have to extend the Index trait
to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx, 
offset_in_chunk)

since a 'seek' would then be 'instant' (at least there is nothing we 
could await),
we would just have a 'psuedo-async' interface to implement

it would then 'magically' work also for a dynamic index

alternatively, we could also implement the 'async_buffered_read' simply
for our AsyncIndexReader also and still drop this here entirely

more comments inline

0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html

> 
>   src/backup/fixed_index.rs | 145 ++++++++++----------------------------
>   1 file changed, 37 insertions(+), 108 deletions(-)
> 
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..3f5fde2a 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -1,5 +1,6 @@
>   use anyhow::{bail, format_err, Error};
>   use std::io::{Seek, SeekFrom};
> +use std::cmp::min;

no need to use this see comment further below

>   
>   use super::chunk_stat::*;
>   use super::chunk_store::*;
> @@ -11,7 +12,7 @@ use std::fs::File;
>   use std::io::Write;
>   use std::os::unix::io::AsRawFd;
>   use std::path::{Path, PathBuf};
> -use std::sync::Arc;
> +use std::sync::{Arc,RwLock};
>   
>   use super::read_chunk::*;
>   use super::ChunkInfo;
> @@ -146,20 +147,6 @@ impl FixedIndexReader {
>           Ok(())
>       }
>   
> -    #[inline]
> -    fn chunk_end(&self, pos: usize) -> u64 {
> -        if pos >= self.index_length {
> -            panic!("chunk index out of range");
> -        }
> -
> -        let end = ((pos + 1) * self.chunk_size) as u64;
> -        if end > self.size {
> -            self.size
> -        } else {
> -            end
> -        }
> -    }
> -
>       pub fn print_info(&self) {
>           println!("Size: {}", self.size);
>           println!("ChunkSize: {}", self.chunk_size);
> @@ -466,27 +453,29 @@ impl FixedIndexWriter {
>       }
>   }
>   
> +struct ChunkBuffer {
> +    data: Vec<u8>,
> +    chunk_idx: usize,
> +}
> +
>   pub struct BufferedFixedReader<S> {
>       store: S,
>       index: FixedIndexReader,
>       archive_size: u64,
> -    read_buffer: Vec<u8>,
> -    buffered_chunk_idx: usize,
> -    buffered_chunk_start: u64,
> -    read_offset: u64,
> +    buffer: RwLock<ChunkBuffer>,
>   }
>   
> -impl<S: ReadChunk> BufferedFixedReader<S> {
> +impl<S: AsyncReadChunk> BufferedFixedReader<S> {
>       pub fn new(index: FixedIndexReader, store: S) -> Self {
>           let archive_size = index.size;
>           Self {
>               store,
>               index,
>               archive_size,
> -            read_buffer: Vec::with_capacity(1024 * 1024),
> -            buffered_chunk_idx: 0,
> -            buffered_chunk_start: 0,
> -            read_offset: 0,
> +            buffer: RwLock::new(ChunkBuffer {
> +                data: Vec::with_capacity(1024 * 1024 * 4),
> +                chunk_idx: 0,
> +            }),
>           }
>       }
>   
> @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>           self.archive_size
>       }
>   
> -    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
> +    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
>           let index = &self.index;
>           let info = match index.chunk_info(idx) {
>               Some(info) => info,
> @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>   
>           // fixme: avoid copy
>   
> -        let data = self.store.read_chunk(&info.digest)?;
> +        let data = self.store.read_chunk(&info.digest).await?;
>           let size = info.range.end - info.range.start;
>           if size != data.len() as u64 {
>               bail!("read chunk with wrong size ({} != {}", size, data.len());
>           }
>   
> -        self.read_buffer.clear();
> -        self.read_buffer.extend_from_slice(&data);
> +        let mut buffer = self.buffer.write().unwrap();
> +        buffer.data.clear();
> +        buffer.data.extend_from_slice(&data);
> +        buffer.chunk_idx = idx;
>   
> -        self.buffered_chunk_idx = idx;
> -
> -        self.buffered_chunk_start = info.range.start as u64;
> -        Ok(())
> +        Ok(data)
>       }
> -}
>   
> -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
> -    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
> +    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
>           if offset == self.archive_size {
> -            return Ok(&self.read_buffer[0..0]);
> +            return Ok(0);
>           }
>   
> -        let buffer_len = self.read_buffer.len();
> -        let index = &self.index;
> +        let idx = (offset / self.index.chunk_size as u64) as usize;
> +
> +        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
> +            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
> +            let data_len = min(from.len() - buffer_offset, size);

instead of using std::cmp::min, you can simply use min on a number:
let data_len =  size.min(from.len() - buffer_offset);

> +            unsafe {
> +                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
> +            }

here you can avoid unsafe code by writing:

buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]);

(does internally exactly the same, but checks the lengths and panics in 
error case instead of corrupting memory/segfaulting)

> +            data_len as _
> +        };
>   
> -        // optimization for sequential read
> -        if buffer_len > 0
> -            && ((self.buffered_chunk_idx + 1) < index.index_length)
> -            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
>           {
> -            let next_idx = self.buffered_chunk_idx + 1;
> -            let next_end = index.chunk_end(next_idx);
> -            if offset < next_end {
> -                self.buffer_chunk(next_idx)?;
> -                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -                return Ok(&self.read_buffer[buffer_offset..]);
> +            let buffer = self.buffer.read().unwrap();
> +            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
> +                return Ok(copy_to_buf(&buffer.data));
>               }
>           }
>   
> -        if (buffer_len == 0)
> -            || (offset < self.buffered_chunk_start)
> -            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> -        {
> -            let idx = (offset / index.chunk_size as u64) as usize;
> -            self.buffer_chunk(idx)?;
> -        }
> -
> -        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -        Ok(&self.read_buffer[buffer_offset..])
> -    }
> -}
> -
> -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
> -    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
> -        use crate::tools::BufferedRead;
> -        use std::io::{Error, ErrorKind};
> -
> -        let data = match self.buffered_read(self.read_offset) {
> -            Ok(v) => v,
> -            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
> -        };
> -
> -        let n = if data.len() > buf.len() {
> -            buf.len()
> -        } else {
> -            data.len()
> -        };
> -
> -        unsafe {
> -            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
> -        }
> -
> -        self.read_offset += n as u64;
> -
> -        Ok(n)
> -    }
> -}
> -
> -impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
> -    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
> -        let new_offset = match pos {
> -            SeekFrom::Start(start_offset) => start_offset as i64,
> -            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
> -            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
> -        };
> -
> -        use std::io::{Error, ErrorKind};
> -        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
> -            return Err(Error::new(
> -                ErrorKind::Other,
> -                format!(
> -                    "seek is out of range {} ([0..{}])",
> -                    new_offset, self.archive_size
> -                ),
> -            ));
> -        }
> -        self.read_offset = new_offset as u64;
> -
> -        Ok(self.read_offset)
> +        let new_data = self.buffer_chunk(idx).await?;
> +        Ok(copy_to_buf(&new_data))
>       }
>   }
> 





  reply	other threads:[~2020-07-21  9:38 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
2020-07-21  9:37   ` Dominik Csapak [this message]
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=d684e6ed-5c64-2a78-70ba-d3fb9dfb4bbf@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal