* [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings
@ 2020-07-20 15:02 Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
` (2 more replies)
0 siblings, 3 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
To: pbs-devel
When using the PBS blockdriver with qemu-nbd (for example), it can happen that
enough read requests are issued to saturate the tokio thread pool. Not an issue
in general, but as me and Wolfgang painstakenly discovered a while back, it does
break block_on, which is used in BufferedFixedReader. This means that reading
larger amounts of data would hang the QEMU process [0].
Fix this by making the entire BufferedFixedReader API async, thus not requiring
a block_on.
Additionally I discovered a seperate bug (fixed by patch 3), wherein read
requests that we're not aligned to the chunk size would return bogus data. This
too only seems to happen in non-VM connections (e.g. nbd, etc...).
[0] ...and since the NBD kernel driver appears to be horribly broken, this often
also crashes most of the system, but that's a different story. If you ever get
in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
disconnect the device ('qemu-nbd -d' intelligently issues a read before
disconnecting, thus hanging before getting anything done...)
backup: Stefan Reiter (1):
make BufferedFixedReader async
src/backup/fixed_index.rs | 145 ++++++++++----------------------------
1 file changed, 37 insertions(+), 108 deletions(-)
backup-qemu: Stefan Reiter (1):
use new async BufferedFixedReader API
src/restore.rs | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
qemu: Stefan Reiter (1):
PVE: PBS: iterate read_image_at until all data is available
block/pbs.c | 31 ++++++++++++++++++++++---------
1 file changed, 22 insertions(+), 9 deletions(-)
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
2020-07-21 9:37 ` Dominik Csapak
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
2 siblings, 1 reply; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
To: pbs-devel
It's currently only used in proxmox-backup-qemu, so no users in this
package need to be changed.
This also allows simplifying the interface to just what is needed in
proxmox-backup-qemu, i.e. by making it async we need to remove the
std::io::Read trait, and that allows to get rid of the seeking
workaround.
Cache locking is done with a reader-writer lock, though multiple
requests for one non-cached chunk are still possible (cannot hold a lock
during async HTTP request) - though in testing, performance did not
regress.
The "sequential read" optimization is removed, since it didn't serve any
purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
I'm unsure about the approach here, happy to hear feedback from anyone more
familiar with Rust. I couldn't find a way to combine the "read" trait impl with
async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
user I don't think this is an issue, but I don't know if this interface was put
here deliberately, even if unused elsewhere.
src/backup/fixed_index.rs | 145 ++++++++++----------------------------
1 file changed, 37 insertions(+), 108 deletions(-)
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..3f5fde2a 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, format_err, Error};
use std::io::{Seek, SeekFrom};
+use std::cmp::min;
use super::chunk_stat::*;
use super::chunk_store::*;
@@ -11,7 +12,7 @@ use std::fs::File;
use std::io::Write;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
-use std::sync::Arc;
+use std::sync::{Arc,RwLock};
use super::read_chunk::*;
use super::ChunkInfo;
@@ -146,20 +147,6 @@ impl FixedIndexReader {
Ok(())
}
- #[inline]
- fn chunk_end(&self, pos: usize) -> u64 {
- if pos >= self.index_length {
- panic!("chunk index out of range");
- }
-
- let end = ((pos + 1) * self.chunk_size) as u64;
- if end > self.size {
- self.size
- } else {
- end
- }
- }
-
pub fn print_info(&self) {
println!("Size: {}", self.size);
println!("ChunkSize: {}", self.chunk_size);
@@ -466,27 +453,29 @@ impl FixedIndexWriter {
}
}
+struct ChunkBuffer {
+ data: Vec<u8>,
+ chunk_idx: usize,
+}
+
pub struct BufferedFixedReader<S> {
store: S,
index: FixedIndexReader,
archive_size: u64,
- read_buffer: Vec<u8>,
- buffered_chunk_idx: usize,
- buffered_chunk_start: u64,
- read_offset: u64,
+ buffer: RwLock<ChunkBuffer>,
}
-impl<S: ReadChunk> BufferedFixedReader<S> {
+impl<S: AsyncReadChunk> BufferedFixedReader<S> {
pub fn new(index: FixedIndexReader, store: S) -> Self {
let archive_size = index.size;
Self {
store,
index,
archive_size,
- read_buffer: Vec::with_capacity(1024 * 1024),
- buffered_chunk_idx: 0,
- buffered_chunk_start: 0,
- read_offset: 0,
+ buffer: RwLock::new(ChunkBuffer {
+ data: Vec::with_capacity(1024 * 1024 * 4),
+ chunk_idx: 0,
+ }),
}
}
@@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
self.archive_size
}
- fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
+ async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
let index = &self.index;
let info = match index.chunk_info(idx) {
Some(info) => info,
@@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
// fixme: avoid copy
- let data = self.store.read_chunk(&info.digest)?;
+ let data = self.store.read_chunk(&info.digest).await?;
let size = info.range.end - info.range.start;
if size != data.len() as u64 {
bail!("read chunk with wrong size ({} != {}", size, data.len());
}
- self.read_buffer.clear();
- self.read_buffer.extend_from_slice(&data);
+ let mut buffer = self.buffer.write().unwrap();
+ buffer.data.clear();
+ buffer.data.extend_from_slice(&data);
+ buffer.chunk_idx = idx;
- self.buffered_chunk_idx = idx;
-
- self.buffered_chunk_start = info.range.start as u64;
- Ok(())
+ Ok(data)
}
-}
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
- fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
+ pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
if offset == self.archive_size {
- return Ok(&self.read_buffer[0..0]);
+ return Ok(0);
}
- let buffer_len = self.read_buffer.len();
- let index = &self.index;
+ let idx = (offset / self.index.chunk_size as u64) as usize;
+
+ let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
+ let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
+ let data_len = min(from.len() - buffer_offset, size);
+ unsafe {
+ std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
+ }
+ data_len as _
+ };
- // optimization for sequential read
- if buffer_len > 0
- && ((self.buffered_chunk_idx + 1) < index.index_length)
- && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
{
- let next_idx = self.buffered_chunk_idx + 1;
- let next_end = index.chunk_end(next_idx);
- if offset < next_end {
- self.buffer_chunk(next_idx)?;
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- return Ok(&self.read_buffer[buffer_offset..]);
+ let buffer = self.buffer.read().unwrap();
+ if buffer.data.len() != 0 && buffer.chunk_idx == idx {
+ return Ok(copy_to_buf(&buffer.data));
}
}
- if (buffer_len == 0)
- || (offset < self.buffered_chunk_start)
- || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
- {
- let idx = (offset / index.chunk_size as u64) as usize;
- self.buffer_chunk(idx)?;
- }
-
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- Ok(&self.read_buffer[buffer_offset..])
- }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
- use crate::tools::BufferedRead;
- use std::io::{Error, ErrorKind};
-
- let data = match self.buffered_read(self.read_offset) {
- Ok(v) => v,
- Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
- };
-
- let n = if data.len() > buf.len() {
- buf.len()
- } else {
- data.len()
- };
-
- unsafe {
- std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
- }
-
- self.read_offset += n as u64;
-
- Ok(n)
- }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
- fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
- let new_offset = match pos {
- SeekFrom::Start(start_offset) => start_offset as i64,
- SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
- SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
- };
-
- use std::io::{Error, ErrorKind};
- if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
- return Err(Error::new(
- ErrorKind::Other,
- format!(
- "seek is out of range {} ([0..{}])",
- new_offset, self.archive_size
- ),
- ));
- }
- self.read_offset = new_offset as u64;
-
- Ok(self.read_offset)
+ let new_data = self.buffer_chunk(idx).await?;
+ Ok(copy_to_buf(&new_data))
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
2 siblings, 0 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
Requires dependency bump on proxmox-backup, and while at it maybe the proxmox
crate to 0.2 as well? (no issues in my testing, I've been building with 0.2.0
for a while now, since it's the one in the repos).
src/restore.rs | 16 ++++++----------
1 file changed, 6 insertions(+), 10 deletions(-)
diff --git a/src/restore.rs b/src/restore.rs
index 3e37066..9995743 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,13 +1,12 @@
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
-use std::io::{Read, Seek, SeekFrom};
use std::convert::TryInto;
use anyhow::{format_err, bail, Error};
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
-use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place};
+use proxmox_backup::tools::runtime::get_runtime_with_builder;
use proxmox_backup::backup::*;
use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
@@ -16,7 +15,7 @@ use crate::registry::Registry;
use crate::capi_types::DataPointer;
struct ImageAccessInfo {
- reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>,
+ reader: Arc<BufferedFixedReader<RemoteChunkReader>>,
_archive_name: String,
archive_size: u64,
}
@@ -236,7 +235,7 @@ impl RestoreTask {
let info = ImageAccessInfo {
archive_size,
_archive_name: archive_name, /// useful to debug
- reader: Arc::new(Mutex::new(reader)),
+ reader: Arc::new(reader),
};
(*self.image_registry.lock().unwrap()).register(info)
@@ -260,12 +259,9 @@ impl RestoreTask {
bail!("read index {} out of bounds {}", offset, image_size);
}
- let bytes = block_in_place(|| {
- let mut reader = reader.lock().unwrap();
- reader.seek(SeekFrom::Start(offset))?;
- let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
- reader.read(buf)
- })?;
+ let size = size as usize;
+ let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size)};
+ let bytes = reader.async_buffered_read(buf, size, offset).await?;
Ok(bytes.try_into()?)
}
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
@ 2020-07-20 15:02 ` Stefan Reiter
2 siblings, 0 replies; 5+ messages in thread
From: Stefan Reiter @ 2020-07-20 15:02 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
Sent as seperate patch for review, but can also be squashed into the PBS bdrv
patch easily.
block/pbs.c | 31 ++++++++++++++++++++++---------
1 file changed, 22 insertions(+), 9 deletions(-)
diff --git a/block/pbs.c b/block/pbs.c
index 1481a2bfd1..0083406ad9 100644
--- a/block/pbs.c
+++ b/block/pbs.c
@@ -198,7 +198,7 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs,
QEMUIOVector *qiov, int flags)
{
BDRVPBSState *s = bs->opaque;
- int ret;
+ int ret, cur = 0;
char *pbs_error = NULL;
uint8_t *buf = malloc(bytes);
@@ -207,18 +207,31 @@ static coroutine_fn int pbs_co_preadv(BlockDriverState *bs,
.ctx = qemu_get_current_aio_context(),
};
- proxmox_restore_read_image_at_async(s->conn, s->aid, buf, offset, bytes,
- read_callback, (void *) &rcb, &ret, &pbs_error);
+ /* we need to retry until we have either read all requested data or hit the
+ * end of the file - QEMU does not re-call this function in case the
+ * returned value is not equal to 'bytes', it just assumes there is no data
+ * to be read, which breaks unaligned reads */
+ while (cur < bytes) {
+ proxmox_restore_read_image_at_async(s->conn, s->aid, buf + cur, offset + cur, bytes - cur,
+ read_callback, (void *) &rcb, &ret, &pbs_error);
+ qemu_coroutine_yield();
- qemu_coroutine_yield();
+ if (ret < 0) {
+ fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error");
+ if (pbs_error) proxmox_backup_free_error(pbs_error);
+ free(buf);
+ return -EIO;
+ }
- if (ret < 0) {
- fprintf(stderr, "error during PBS read: %s\n", pbs_error ? pbs_error : "unknown error");
- if (pbs_error) proxmox_backup_free_error(pbs_error);
- return -EIO;
+ // EOF
+ if (ret == 0) {
+ break;
+ }
+
+ cur += ret;
}
- qemu_iovec_from_buf(qiov, 0, buf, bytes);
+ qemu_iovec_from_buf(qiov, 0, buf, cur);
free(buf);
return ret;
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
@ 2020-07-21 9:37 ` Dominik Csapak
0 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2020-07-21 9:37 UTC (permalink / raw)
To: pbs-devel
On 7/20/20 5:02 PM, Stefan Reiter wrote:
> It's currently only used in proxmox-backup-qemu, so no users in this
> package need to be changed.
>
> This also allows simplifying the interface to just what is needed in
> proxmox-backup-qemu, i.e. by making it async we need to remove the
> std::io::Read trait, and that allows to get rid of the seeking
> workaround.
>
> Cache locking is done with a reader-writer lock, though multiple
> requests for one non-cached chunk are still possible (cannot hold a lock
> during async HTTP request) - though in testing, performance did not
> regress.
>
> The "sequential read" optimization is removed, since it didn't serve any
> purpose AFAICT. chunk_end was not used anywhere else (gave a warning).
>
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>
> I'm unsure about the approach here, happy to hear feedback from anyone more
> familiar with Rust. I couldn't find a way to combine the "read" trait impl with
> async, so I removed it entirely. Since proxmox-backup-qemu seems to be the only
> user I don't think this is an issue, but I don't know if this interface was put
> here deliberately, even if unused elsewhere.
high level comment:
maybe it would be nicer to implement AsyncSeek[0] for our existing
AsyncIndexReader (wich also buffers a single chunk, but works
for fixed + dynamic index). then we could simply keep the
behaviour we have in proxmox-backup-qemu (just add '.await')
for implementing this, we would have to extend the Index trait
to provide a function fn get_chunk_from_offset(offset) -> (chunk_idx,
offset_in_chunk)
since a 'seek' would then be 'instant' (at least there is nothing we
could await),
we would just have a 'psuedo-async' interface to implement
it would then 'magically' work also for a dynamic index
alternatively, we could also implement the 'async_buffered_read' simply
for our AsyncIndexReader also and still drop this here entirely
more comments inline
0: https://docs.rs/tokio/0.2.9/tokio/io/trait.AsyncSeek.html
>
> src/backup/fixed_index.rs | 145 ++++++++++----------------------------
> 1 file changed, 37 insertions(+), 108 deletions(-)
>
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..3f5fde2a 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -1,5 +1,6 @@
> use anyhow::{bail, format_err, Error};
> use std::io::{Seek, SeekFrom};
> +use std::cmp::min;
no need to use this see comment further below
>
> use super::chunk_stat::*;
> use super::chunk_store::*;
> @@ -11,7 +12,7 @@ use std::fs::File;
> use std::io::Write;
> use std::os::unix::io::AsRawFd;
> use std::path::{Path, PathBuf};
> -use std::sync::Arc;
> +use std::sync::{Arc,RwLock};
>
> use super::read_chunk::*;
> use super::ChunkInfo;
> @@ -146,20 +147,6 @@ impl FixedIndexReader {
> Ok(())
> }
>
> - #[inline]
> - fn chunk_end(&self, pos: usize) -> u64 {
> - if pos >= self.index_length {
> - panic!("chunk index out of range");
> - }
> -
> - let end = ((pos + 1) * self.chunk_size) as u64;
> - if end > self.size {
> - self.size
> - } else {
> - end
> - }
> - }
> -
> pub fn print_info(&self) {
> println!("Size: {}", self.size);
> println!("ChunkSize: {}", self.chunk_size);
> @@ -466,27 +453,29 @@ impl FixedIndexWriter {
> }
> }
>
> +struct ChunkBuffer {
> + data: Vec<u8>,
> + chunk_idx: usize,
> +}
> +
> pub struct BufferedFixedReader<S> {
> store: S,
> index: FixedIndexReader,
> archive_size: u64,
> - read_buffer: Vec<u8>,
> - buffered_chunk_idx: usize,
> - buffered_chunk_start: u64,
> - read_offset: u64,
> + buffer: RwLock<ChunkBuffer>,
> }
>
> -impl<S: ReadChunk> BufferedFixedReader<S> {
> +impl<S: AsyncReadChunk> BufferedFixedReader<S> {
> pub fn new(index: FixedIndexReader, store: S) -> Self {
> let archive_size = index.size;
> Self {
> store,
> index,
> archive_size,
> - read_buffer: Vec::with_capacity(1024 * 1024),
> - buffered_chunk_idx: 0,
> - buffered_chunk_start: 0,
> - read_offset: 0,
> + buffer: RwLock::new(ChunkBuffer {
> + data: Vec::with_capacity(1024 * 1024 * 4),
> + chunk_idx: 0,
> + }),
> }
> }
>
> @@ -494,7 +483,7 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
> self.archive_size
> }
>
> - fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
> + async fn buffer_chunk(&self, idx: usize) -> Result<Vec<u8>, Error> {
> let index = &self.index;
> let info = match index.chunk_info(idx) {
> Some(info) => info,
> @@ -503,104 +492,44 @@ impl<S: ReadChunk> BufferedFixedReader<S> {
>
> // fixme: avoid copy
>
> - let data = self.store.read_chunk(&info.digest)?;
> + let data = self.store.read_chunk(&info.digest).await?;
> let size = info.range.end - info.range.start;
> if size != data.len() as u64 {
> bail!("read chunk with wrong size ({} != {}", size, data.len());
> }
>
> - self.read_buffer.clear();
> - self.read_buffer.extend_from_slice(&data);
> + let mut buffer = self.buffer.write().unwrap();
> + buffer.data.clear();
> + buffer.data.extend_from_slice(&data);
> + buffer.chunk_idx = idx;
>
> - self.buffered_chunk_idx = idx;
> -
> - self.buffered_chunk_start = info.range.start as u64;
> - Ok(())
> + Ok(data)
> }
> -}
>
> -impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
> - fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
> + pub async fn async_buffered_read(&self, buf: &mut [u8], size: usize, offset: u64) -> Result<u64, Error> {
> if offset == self.archive_size {
> - return Ok(&self.read_buffer[0..0]);
> + return Ok(0);
> }
>
> - let buffer_len = self.read_buffer.len();
> - let index = &self.index;
> + let idx = (offset / self.index.chunk_size as u64) as usize;
> +
> + let mut copy_to_buf = move |from: &Vec<u8>| -> u64 {
> + let buffer_offset = (offset % self.index.chunk_size as u64) as usize;
> + let data_len = min(from.len() - buffer_offset, size);
instead of using std::cmp::min, you can simply use min on a number:
let data_len = size.min(from.len() - buffer_offset);
> + unsafe {
> + std::ptr::copy_nonoverlapping(from.as_ptr().add(buffer_offset), buf.as_mut_ptr(), data_len);
> + }
here you can avoid unsafe code by writing:
buf[0..data_len].copy_from_slice(from[buffer_offset..buffer_offset+data_len]);
(does internally exactly the same, but checks the lengths and panics in
error case instead of corrupting memory/segfaulting)
> + data_len as _
> + };
>
> - // optimization for sequential read
> - if buffer_len > 0
> - && ((self.buffered_chunk_idx + 1) < index.index_length)
> - && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> {
> - let next_idx = self.buffered_chunk_idx + 1;
> - let next_end = index.chunk_end(next_idx);
> - if offset < next_end {
> - self.buffer_chunk(next_idx)?;
> - let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> - return Ok(&self.read_buffer[buffer_offset..]);
> + let buffer = self.buffer.read().unwrap();
> + if buffer.data.len() != 0 && buffer.chunk_idx == idx {
> + return Ok(copy_to_buf(&buffer.data));
> }
> }
>
> - if (buffer_len == 0)
> - || (offset < self.buffered_chunk_start)
> - || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
> - {
> - let idx = (offset / index.chunk_size as u64) as usize;
> - self.buffer_chunk(idx)?;
> - }
> -
> - let buffer_offset = (offset - self.buffered_chunk_start) as usize;
> - Ok(&self.read_buffer[buffer_offset..])
> - }
> -}
> -
> -impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
> - fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
> - use crate::tools::BufferedRead;
> - use std::io::{Error, ErrorKind};
> -
> - let data = match self.buffered_read(self.read_offset) {
> - Ok(v) => v,
> - Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
> - };
> -
> - let n = if data.len() > buf.len() {
> - buf.len()
> - } else {
> - data.len()
> - };
> -
> - unsafe {
> - std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
> - }
> -
> - self.read_offset += n as u64;
> -
> - Ok(n)
> - }
> -}
> -
> -impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
> - fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
> - let new_offset = match pos {
> - SeekFrom::Start(start_offset) => start_offset as i64,
> - SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
> - SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
> - };
> -
> - use std::io::{Error, ErrorKind};
> - if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
> - return Err(Error::new(
> - ErrorKind::Other,
> - format!(
> - "seek is out of range {} ([0..{}])",
> - new_offset, self.archive_size
> - ),
> - ));
> - }
> - self.read_offset = new_offset as u64;
> -
> - Ok(self.read_offset)
> + let new_data = self.buffer_chunk(idx).await?;
> + Ok(copy_to_buf(&new_data))
> }
> }
>
^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2020-07-21 9:38 UTC | newest]
Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-20 15:02 [pbs-devel] [PATCH 0/3] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH backup 1/3] make BufferedFixedReader async Stefan Reiter
2020-07-21 9:37 ` Dominik Csapak
2020-07-20 15:02 ` [pbs-devel] [PATCH backup-qemu 2/3] use new async BufferedFixedReader API Stefan Reiter
2020-07-20 15:02 ` [pbs-devel] [PATCH qemu 3/3] PVE: PBS: iterate read_image_at until all data is available Stefan Reiter
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.