public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings
@ 2020-07-20 15:02 Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
                   ` (2 more replies)
  0 siblings, 3 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
  To: pbs-devel

When using the PBS blockdriver with qemu-nbd (for example), it can happen that
enough read requests are issued to saturate the tokio thread pool. Not an issue
in general, but as me and Wolfgang painstakenly discovered a while back, it does
break block_on, which is used in BufferedFixedReader. This means that reading
larger amounts of data would hang the QEMU process [0].

Fix this by making the entire BufferedFixedReader API async, thus not requiring
a block_on.

Additionally I discovered a seperate bug (fixed by patch 3), wherein read
requests that we're not aligned to the chunk size would return bogus data. This
too only seems to happen in non-VM connections (e.g. nbd, etc...).


[0] ...and since the NBD kernel driver appears to be horribly broken, this often
also crashes most of the system, but that's a different story. If you ever get
in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
disconnect the device ('qemu-nbd -d' intelligently issues a read before
disconnecting, thus hanging before getting anything done...)


backup: Stefan Reiter (1):
  make BufferedFixedReader async

 src/backup/fixed_index.rs | 145 ++++++++++----------------------------
 1 file changed, 37 insertions(+), 108 deletions(-)

backup-qemu: Stefan Reiter (1):
  use new async BufferedFixedReader API

 src/restore.rs | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

qemu: Stefan Reiter (1):
  PVE: PBS: iterate read_image_at until all data is available

 block/pbs.c | 31 ++++++++++++++++++++++---------
 1 file changed, 22 insertions(+), 9 deletions(-)

-- 
2.20.1




^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
  2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
  2020-07-21  9:37   ` Dominik Csapak
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
  2 siblings, 1 reply; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
  To: pbs-devel

It's currently only used in proxmox-backup-qemu, so no users in this
package need to be changed.

This also allows simplifying the interface to just what is needed in
proxmox-backup-qemu, i.e. by making it async we need to remove the
std::io::Read trait, and that allows to get rid of the seeking
workaround.

Cache locking is done with a reader-writer lock, though multiple
requests for one non-cached chunk are still possible (cannot hold a lock
during async HTTP request) - though in testing, performance did not
regress.

The "sequential read" optimization is removed, since it didn't serve any
purpose AFAICT. chunk_end was not used anywhere else (gave a warning).

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

I'm unsure about the approach here, happy to hear feedback from anyone more
familiar with Rust. I couldn't find a way to combine the "read" trait impl with
async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
user I don't think this is an issue, but I don't know if this interface was put
here deliberately, even if unused elsewhere.

 src/backup/fixed_index.rs | 145 ++++++++++----------------------------
 1 file changed, 37 insertions(+), 108 deletions(-)

diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..3f5fde2a 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -1,5 +1,6 @@
 use anyhow::{bail, format_err, Error};
 use std::io::{Seek, SeekFrom};
+use std::cmp::min;
 
 use super::chunk_stat::*;
 use super::chunk_store::*;
@@ -11,7 +12,7 @@ use std::fs::File;
 use std::io::Write;
 use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc,RwLock};
 
 use super::read_chunk::*;
 use super::ChunkInfo;
@@ -146,20 +147,6 @@ impl FixedIndexReader {
         Ok(())
     }
 
-    #[inline]
-    fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_length {
-            panic!("chunk index out of range");
-        }
-
-        let end = ((pos + 1) * self.chunk_size) as u64;
-        if end > self.size {
-            self.size
-        } else {
-            end
-        }
-    }
-
     pub fn print_info(&self) {
         println!("Size: {}", self.size);
         println!("ChunkSize: {}", self.chunk_size);
@@ -466,27 +453,29 @@ impl FixedIndexWriter {
     }
 }
 
+struct ChunkBuffer {
+    data: Vec<u8>,
+    chunk_idx: usize,
+}
+
 pub struct BufferedFixedReader<S> {
     store: S,
     index: FixedIndexReader,
     archive_size: u64,
-    read_buffer: Vec<u8>,
-    buffered_chunk_idx: usize,
-    buffered_chunk_start: u64,
-    read_offset: u64,
+    buffer: RwLock<ChunkBuffer>,
 }
 
-impl<S: ReadChunk> BufferedFixedReader<S> {
+impl<S: AsyncReadChunk> BufferedFixedReader<S> {
     pub fn new(index: FixedIndexReader, store: S) -> Self {
         let archive_size = index.size;
         Self {
             store,
             index,
             archive_size,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            buffered_chunk_idx: 0,
-            buffered_chunk_start: 0,
-            read_offset: 0,
+            buffer: RwLock::new(ChunkBuffer {
+                data: Vec::with_capacity(1024 * 1024 * 4),
+                chunk_idx: 0,
+            }),
         }
     }
 
@@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
         self.archive_size
     }
 
-    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
+    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
         let index = &self.index;
         let info = match index.chunk_info(idx) {
             Some(info) => info,
@@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
 
         // fixme: avoid copy
 
-        let data = self.store.read_chunk(&info.digest)?;
+        let data = self.store.read_chunk(&info.digest).await?;
         let size = info.range.end - info.range.start;
         if size != data.len() as u64 {
             bail!("read chunk with wrong size ({} != {}", size, data.len());
         }
 
-        self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
+        let mut buffer = self.buffer.write().unwrap();
+        buffer.data.clear();
+        buffer.data.extend_from_slice(&data);
+        buffer.chunk_idx = idx;
 
-        self.buffered_chunk_idx = idx;
-
-        self.buffered_chunk_start = info.range.start as u64;
-        Ok(())
+        Ok(data)
     }
-}
 
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
-    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
+    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
         if offset == self.archive_size {
-            return Ok(&self.read_buffer[0..0]);
+            return Ok(0);
         }
 
-        let buffer_len = self.read_buffer.len();
-        let index = &self.index;
+        let idx = (offset / self.index.chunk_size as u64) as usize;
+
+        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
+            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
+            let data_len = min(from.len() - buffer_offset, size);
+            unsafe {
+                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
+            }
+            data_len as _
+        };
 
-        // optimization for sequential read
-        if buffer_len > 0
-            && ((self.buffered_chunk_idx + 1) < index.index_length)
-            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
         {
-            let next_idx = self.buffered_chunk_idx + 1;
-            let next_end = index.chunk_end(next_idx);
-            if offset < next_end {
-                self.buffer_chunk(next_idx)?;
-                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-                return Ok(&self.read_buffer[buffer_offset..]);
+            let buffer = self.buffer.read().unwrap();
+            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
+                return Ok(copy_to_buf(&buffer.data));
             }
         }
 
-        if (buffer_len == 0)
-            || (offset < self.buffered_chunk_start)
-            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let idx = (offset / index.chunk_size as u64) as usize;
-            self.buffer_chunk(idx)?;
-        }
-
-        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-        Ok(&self.read_buffer[buffer_offset..])
-    }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-        use crate::tools::BufferedRead;
-        use std::io::{Error, ErrorKind};
-
-        let data = match self.buffered_read(self.read_offset) {
-            Ok(v) => v,
-            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
-        };
-
-        let n = if data.len() > buf.len() {
-            buf.len()
-        } else {
-            data.len()
-        };
-
-        unsafe {
-            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
-        }
-
-        self.read_offset += n as u64;
-
-        Ok(n)
-    }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-        let new_offset = match pos {
-            SeekFrom::Start(start_offset) => start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
-            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
-        };
-
-        use std::io::{Error, ErrorKind};
-        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!(
-                    "seek is out of range {} ([0..{}])",
-                    new_offset, self.archive_size
-                ),
-            ));
-        }
-        self.read_offset = new_offset as u64;
-
-        Ok(self.read_offset)
+        let new_data = self.buffer_chunk(idx).await?;
+        Ok(copy_to_buf(&new_data))
     }
 }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API
  2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
  2 siblings, 0 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

Requires dependency bump on proxmox-backup, and while at it maybe the proxmox
crate to 0.2 as well? (no issues in my testing, I've been building with 0.2.0
for a while now, since it's the one in the repos).

 src/restore.rs | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/src/restore.rs b/src/restore.rs
index 3e37066..9995743 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,13 +1,12 @@
 use std::sync::{Arc, Mutex};
 use std::collections::HashMap;
-use std::io::{Read, Seek, SeekFrom};
 use std::convert::TryInto;
 
 use anyhow::{format_err, bail, Error};
 use once_cell::sync::OnceCell;
 use tokio::runtime::Runtime;
 
-use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place};
+use proxmox_backup::tools::runtime::get_runtime_with_builder;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -16,7 +15,7 @@ use crate::registry::Registry;
 use crate::capi_types::DataPointer;
 
 struct ImageAccessInfo {
-    reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>,
+    reader: Arc<BufferedFixedReader<RemoteChunkReader>>,
     _archive_name: String,
     archive_size: u64,
 }
@@ -236,7 +235,7 @@ impl RestoreTask {
         let info = ImageAccessInfo {
             archive_size,
             _archive_name: archive_name, /// useful to debug
-            reader: Arc::new(Mutex::new(reader)),
+            reader: Arc::new(reader),
         };
 
         (*self.image_registry.lock().unwrap()).register(info)
@@ -260,12 +259,9 @@ impl RestoreTask {
             bail!("read index {} out of bounds {}", offset, image_size);
         }
 
-        let bytes = block_in_place(|| {
-            let mut reader = reader.lock().unwrap();
-            reader.seek(SeekFrom::Start(offset))?;
-            let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
-            reader.read(buf)
-        })?;
+        let size = size as usize;
+        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size)};
+        let bytes = reader.async_buffered_read(buf, size, offset).await?;
 
         Ok(bytes.try_into()?)
     }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available
  2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
  2 siblings, 0 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

Sent as seperate patch for review, but can also be squashed into the PBS bdrv
patch easily.

 block/pbs.c | 31 ++++++++++++++++++++++---------
 1 file changed, 22 insertions(+), 9 deletions(-)

diff --git a/block/pbs.c b/block/pbs.c
index 1481a2bfd1..0083406ad9 100644
--- a/block/pbs.c
+++ b/block/pbs.c
@@ -198,7 +198,7 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs,
                                       QEMUIOVector *qiov, int flags)
 {
     BDRVPBSState *s = bs->opaque;
-    int ret;
+    int ret, cur = 0;
     char *pbs_error = NULL;
     uint8_t *buf = malloc(bytes);
 
@@ -207,18 +207,31 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs,
         .ctx = qemu_get_current_aio_context(),
     };
 
-    proxmox_restore_read_image_at_async(s->conn, s->aid, buf, offset, bytes,
-                                        read_callback, (void *) &rcb, &ret, &pbs_error);
+    /* we need to retry until we have either read all requested data or hit the
+     * end of the file - QEMU does not re-call this function in case the
+     * returned value is not equal to 'bytes', it just assumes there is no data
+     * to be read, which breaks unaligned reads */
+    while (cur < bytes) {
+        proxmox_restore_read_image_at_async(s->conn, s->aid, buf + cur, offset + cur, bytes - cur,
+                                            read_callback, (void *) &rcb, &ret, &pbs_error);
+        qemu_coroutine_yield();
 
-    qemu_coroutine_yield();
+        if (ret < 0) {
+            fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error");
+            if (pbs_error) proxmox_backup_free_error(pbs_error);
+            free(buf);
+            return -EIO;
+        }
 
-    if (ret < 0) {
-        fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error");
-        if (pbs_error) proxmox_backup_free_error(pbs_error);
-        return -EIO;
+        // EOF
+        if (ret == 0) {
+            break;
+        }
+
+        cur += ret;
     }
 
-    qemu_iovec_from_buf(qiov, 0, buf, bytes);
+    qemu_iovec_from_buf(qiov, 0, buf, cur);
     free(buf);
 
     return ret;
-- 
2.20.1





^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
  2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
@ 2020-07-21  9:37   ` Dominik Csapak
  0 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-21  9:37 UTC (permalink / raw)
  To: pbs-devel

On 7/20/20 5:02 PM, Stefan Reiter wrote:
> It's currently only used in proxmox-backup-qemu, so no users in this
> package need to be changed.
> 
> This also allows simplifying the interface to just what is needed in
> proxmox-backup-qemu, i.e. by making it async we need to remove the
> std::io::Read trait, and that allows to get rid of the seeking
> workaround.
> 
> Cache locking is done with a reader-writer lock, though multiple
> requests for one non-cached chunk are still possible (cannot hold a lock
> during async HTTP request) - though in testing, performance did not
> regress.
> 
> The "sequential read" optimization is removed, since it didn't serve any
> purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
> 
> I'm unsure about the approach here, happy to hear feedback from anyone more
> familiar with Rust. I couldn't find a way to combine the "read" trait impl with
> async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
> user I don't think this is an issue, but I don't know if this interface was put
> here deliberately, even if unused elsewhere.

high level comment:

maybe it would be nicer to implement AsyncSeek[0] for our existing
AsyncIndexReader (wich also buffers a single chunk, but works
for fixed + dynamic index). then we could simply keep the
behaviour we have in proxmox-backup-qemu (just add '.await')

for implementing this, we would have to extend the Index trait
to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx, 
offset_in_chunk)

since a 'seek' would then be 'instant' (at least there is nothing we 
could await),
we would just have a 'psuedo-async' interface to implement

it would then 'magically' work also for a dynamic index

alternatively, we could also implement the 'async_buffered_read' simply
for our AsyncIndexReader also and still drop this here entirely

more comments inline

0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html

> 
>   src/backup/fixed_index.rs | 145 ++++++++++----------------------------
>   1 file changed, 37 insertions(+), 108 deletions(-)
> 
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..3f5fde2a 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -1,5 +1,6 @@
>   use anyhow::{bail, format_err, Error};
>   use std::io::{Seek, SeekFrom};
> +use std::cmp::min;

no need to use this see comment further below

>   
>   use super::chunk_stat::*;
>   use super::chunk_store::*;
> @@ -11,7 +12,7 @@ use std::fs::File;
>   use std::io::Write;
>   use std::os::unix::io::AsRawFd;
>   use std::path::{Path, PathBuf};
> -use std::sync::Arc;
> +use std::sync::{Arc,RwLock};
>   
>   use super::read_chunk::*;
>   use super::ChunkInfo;
> @@ -146,20 +147,6 @@ impl FixedIndexReader {
>           Ok(())
>       }
>   
> -    #[inline]
> -    fn chunk_end(&self, pos: usize) -> u64 {
> -        if pos >= self.index_length {
> -            panic!("chunk index out of range");
> -        }
> -
> -        let end = ((pos + 1) * self.chunk_size) as u64;
> -        if end > self.size {
> -            self.size
> -        } else {
> -            end
> -        }
> -    }
> -
>       pub fn print_info(&self) {
>           println!("Size: {}", self.size);
>           println!("ChunkSize: {}", self.chunk_size);
> @@ -466,27 +453,29 @@ impl FixedIndexWriter {
>       }
>   }
>   
> +struct ChunkBuffer {
> +    data: Vec<u8>,
> +    chunk_idx: usize,
> +}
> +
>   pub struct BufferedFixedReader<S> {
>       store: S,
>       index: FixedIndexReader,
>       archive_size: u64,
> -    read_buffer: Vec<u8>,
> -    buffered_chunk_idx: usize,
> -    buffered_chunk_start: u64,
> -    read_offset: u64,
> +    buffer: RwLock<ChunkBuffer>,
>   }
>   
> -impl<S: ReadChunk> BufferedFixedReader<S> {
> +impl<S: AsyncReadChunk> BufferedFixedReader<S> {
>       pub fn new(index: FixedIndexReader, store: S) -> Self {
>           let archive_size = index.size;
>           Self {
>               store,
>               index,
>               archive_size,
> -            read_buffer: Vec::with_capacity(1024 * 1024),
> -            buffered_chunk_idx: 0,
> -            buffered_chunk_start: 0,
> -            read_offset: 0,
> +            buffer: RwLock::new(ChunkBuffer {
> +                data: Vec::with_capacity(1024 * 1024 * 4),
> +                chunk_idx: 0,
> +            }),
>           }
>       }
>   
> @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>           self.archive_size
>       }
>   
> -    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
> +    async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
>           let index = &self.index;
>           let info = match index.chunk_info(idx) {
>               Some(info) => info,
> @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>   
>           // fixme: avoid copy
>   
> -        let data = self.store.read_chunk(&info.digest)?;
> +        let data = self.store.read_chunk(&info.digest).await?;
>           let size = info.range.end - info.range.start;
>           if size != data.len() as u64 {
>               bail!("read chunk with wrong size ({} != {}", size, data.len());
>           }
>   
> -        self.read_buffer.clear();
> -        self.read_buffer.extend_from_slice(&data);
> +        let mut buffer = self.buffer.write().unwrap();
> +        buffer.data.clear();
> +        buffer.data.extend_from_slice(&data);
> +        buffer.chunk_idx = idx;
>   
> -        self.buffered_chunk_idx = idx;
> -
> -        self.buffered_chunk_start = info.range.start as u64;
> -        Ok(())
> +        Ok(data)
>       }
> -}
>   
> -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
> -    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
> +    pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
>           if offset == self.archive_size {
> -            return Ok(&self.read_buffer[0..0]);
> +            return Ok(0);
>           }
>   
> -        let buffer_len = self.read_buffer.len();
> -        let index = &self.index;
> +        let idx = (offset / self.index.chunk_size as u64) as usize;
> +
> +        let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
> +            let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
> +            let data_len = min(from.len() - buffer_offset, size);

instead of using std::cmp::min, you can simply use min on a number:
let data_len =  size.min(from.len() - buffer_offset);

> +            unsafe {
> +                std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
> +            }

here you can avoid unsafe code by writing:

buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]);

(does internally exactly the same, but checks the lengths and panics in 
error case instead of corrupting memory/segfaulting)

> +            data_len as _
> +        };
>   
> -        // optimization for sequential read
> -        if buffer_len > 0
> -            && ((self.buffered_chunk_idx + 1) < index.index_length)
> -            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
>           {
> -            let next_idx = self.buffered_chunk_idx + 1;
> -            let next_end = index.chunk_end(next_idx);
> -            if offset < next_end {
> -                self.buffer_chunk(next_idx)?;
> -                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -                return Ok(&self.read_buffer[buffer_offset..]);
> +            let buffer = self.buffer.read().unwrap();
> +            if buffer.data.len() != 0 && buffer.chunk_idx == idx {
> +                return Ok(copy_to_buf(&buffer.data));
>               }
>           }
>   
> -        if (buffer_len == 0)
> -            || (offset < self.buffered_chunk_start)
> -            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> -        {
> -            let idx = (offset / index.chunk_size as u64) as usize;
> -            self.buffer_chunk(idx)?;
> -        }
> -
> -        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> -        Ok(&self.read_buffer[buffer_offset..])
> -    }
> -}
> -
> -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
> -    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
> -        use crate::tools::BufferedRead;
> -        use std::io::{Error, ErrorKind};
> -
> -        let data = match self.buffered_read(self.read_offset) {
> -            Ok(v) => v,
> -            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
> -        };
> -
> -        let n = if data.len() > buf.len() {
> -            buf.len()
> -        } else {
> -            data.len()
> -        };
> -
> -        unsafe {
> -            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
> -        }
> -
> -        self.read_offset += n as u64;
> -
> -        Ok(n)
> -    }
> -}
> -
> -impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
> -    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
> -        let new_offset = match pos {
> -            SeekFrom::Start(start_offset) => start_offset as i64,
> -            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
> -            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
> -        };
> -
> -        use std::io::{Error, ErrorKind};
> -        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
> -            return Err(Error::new(
> -                ErrorKind::Other,
> -                format!(
> -                    "seek is out of range {} ([0..{}])",
> -                    new_offset, self.archive_size
> -                ),
> -            ));
> -        }
> -        self.read_offset = new_offset as u64;
> -
> -        Ok(self.read_offset)
> +        let new_data = self.buffer_chunk(idx).await?;
> +        Ok(copy_to_buf(&new_data))
>       }
>   }
> 





^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2020-07-21  9:38 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
2020-07-21  9:37   ` Dominik Csapak
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal