all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
Date: Mon, 20 Jul 2020 17:02:18 +0200	[thread overview]
Message-ID: <20200720150220.22996-2-s.reiter@proxmox.com> (raw)
In-Reply-To: <20200720150220.22996-1-s.reiter@proxmox.com>

It's currently only used in proxmox-backup-qemu, so no users in this
package need to be changed.

This also allows simplifying the interface to just what is needed in
proxmox-backup-qemu, i.e. by making it async we need to remove the
std::io::Read trait, and that allows to get rid of the seeking
workaround.

Cache locking is done with a reader-writer lock, though multiple
requests for one non-cached chunk are still possible (cannot hold a lock
during async HTTP request) - though in testing, performance did not
regress.

The "sequential read" optimization is removed, since it didn't serve any
purpose AFAICT. chunk_end was not used anywhere else (gave a warning).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

I'm unsure about the approach here, happy to hear feedback from anyone more
familiar with Rust. I couldn't find a way to combine the "read" trait impl with
async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
user I don't think this is an issue, but I don't know if this interface was put
here deliberately, even if unused elsewhere.

 src/backup/fixed_index.rs | 145 ++++++++++----------------------------
 1 file changed, 37 insertions(+), 108 deletions(-)

diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..3f5fde2a 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -1,5 +1,6 @@
 use anyhow::{bail, format_err, Error};
 use std::io::{Seek, SeekFrom};
+use std::cmp::min;
 
 use super::chunk_stat::*;
 use super::chunk_store::*;
@@ -11,7 +12,7 @@ use std::fs::File;
 use std::io::Write;
 use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc,RwLock};
 
 use super::read_chunk::*;
 use super::ChunkInfo;
@@ -146,20 +147,6 @@ impl FixedIndexReader {
         Ok(())
     }
 
-    #[inline]
-    fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_length {
-            panic!("chunk index out of range");
-        }
-
-        let end = ((pos + 1) * self.chunk_size) as u64;
-        if end > self.size {
-            self.size
-        } else {
-            end
-        }
-    }
-
     pub fn print_info(&self) {
         println!("Size: {}", self.size);
         println!("ChunkSize: {}", self.chunk_size);
@@ -466,27 +453,29 @@ impl FixedIndexWriter {
     }
 }
 
+struct ChunkBuffer {
+    data: Vec<u8>,
+    chunk_idx: usize,
+}
+
 pub struct BufferedFixedReader<S> {
     store: S,
     index: FixedIndexReader,
     archive_size: u64,
-    read_buffer: Vec<u8>,
-    buffered_chunk_idx: usize,
-    buffered_chunk_start: u64,
-    read_offset: u64,
+    buffer: RwLock<ChunkBuffer>,
 }
 
-impl<S: ReadChunk> BufferedFixedReader<S> {
+impl<S: AsyncReadChunk> BufferedFixedReader<S> {
     pub fn new(index: FixedIndexReader, store: S) -> Self {
         let archive_size = index.size;
         Self {
             store,
             index,
             archive_size,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            buffered_chunk_idx: 0,
-            buffered_chunk_start: 0,
-            read_offset: 0,
+            buffer: RwLock::new(ChunkBuffer {
+                data: Vec::with_capacity(1024 * 1024 * 4),
+                chunk_idx: 0,
+            }),
         }
     }
 
@@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
         self.archive_size
     }
 
-    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
+    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
         let index = &self.index;
         let info = match index.chunk_info(idx) {
             Some(info) => info,
@@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
 
         // fixme: avoid copy
 
-        let data = self.store.read_chunk(&info.digest)?;
+        let data = self.store.read_chunk(&info.digest).await?;
         let size = info.range.end - info.range.start;
         if size != data.len() as u64 {
             bail!("read chunk with wrong size ({} != {}", size, data.len());
         }
 
-        self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
+        let mut buffer = self.buffer.write().unwrap();
+        buffer.data.clear();
+        buffer.data.extend_from_slice(&data);
+        buffer.chunk_idx = idx;
 
-        self.buffered_chunk_idx = idx;
-
-        self.buffered_chunk_start = info.range.start as u64;
-        Ok(())
+        Ok(data)
     }
-}
 
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
-    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
+    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
         if offset == self.archive_size {
-            return Ok(&self.read_buffer[0..0]);
+            return Ok(0);
         }
 
-        let buffer_len = self.read_buffer.len();
-        let index = &self.index;
+        let idx = (offset / self.index.chunk_size as u64) as usize;
+
+        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
+            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
+            let data_len = min(from.len() - buffer_offset, size);
+            unsafe {
+                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
+            }
+            data_len as _
+        };
 
-        // optimization for sequential read
-        if buffer_len > 0
-            && ((self.buffered_chunk_idx + 1) < index.index_length)
-            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
         {
-            let next_idx = self.buffered_chunk_idx + 1;
-            let next_end = index.chunk_end(next_idx);
-            if offset < next_end {
-                self.buffer_chunk(next_idx)?;
-                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-                return Ok(&self.read_buffer[buffer_offset..]);
+            let buffer = self.buffer.read().unwrap();
+            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
+                return Ok(copy_to_buf(&buffer.data));
             }
         }
 
-        if (buffer_len == 0)
-            || (offset < self.buffered_chunk_start)
-            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let idx = (offset / index.chunk_size as u64) as usize;
-            self.buffer_chunk(idx)?;
-        }
-
-        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-        Ok(&self.read_buffer[buffer_offset..])
-    }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-        use crate::tools::BufferedRead;
-        use std::io::{Error, ErrorKind};
-
-        let data = match self.buffered_read(self.read_offset) {
-            Ok(v) => v,
-            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
-        };
-
-        let n = if data.len() > buf.len() {
-            buf.len()
-        } else {
-            data.len()
-        };
-
-        unsafe {
-            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
-        }
-
-        self.read_offset += n as u64;
-
-        Ok(n)
-    }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-        let new_offset = match pos {
-            SeekFrom::Start(start_offset) => start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
-            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
-        };
-
-        use std::io::{Error, ErrorKind};
-        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!(
-                    "seek is out of range {} ([0..{}])",
-                    new_offset, self.archive_size
-                ),
-            ));
-        }
-        self.read_offset = new_offset as u64;
-
-        Ok(self.read_offset)
+        let new_data = self.buffer_chunk(idx).await?;
+        Ok(copy_to_buf(&new_data))
     }
 }
-- 
2.20.1





  reply	other threads:[~2020-07-20 15:02 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` Stefan Reiter [this message]
2020-07-21  9:37   ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Dominik Csapak
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200720150220.22996-2-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal