* [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
2020-07-22 14:16 ` Thomas Lamprecht
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
` (4 subsequent siblings)
5 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
To: pbs-devel
Necessary for byte-wise seeking through chunks in an index.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup/dynamic_index.rs | 18 ++++++++++++++++++
src/backup/fixed_index.rs | 11 +++++++++++
src/backup/index.rs | 3 +++
3 files changed, 32 insertions(+)
diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
index 4907fe1f..887b7cf3 100644
--- a/src/backup/dynamic_index.rs
+++ b/src/backup/dynamic_index.rs
@@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
digest: self.index[pos].digest.clone(),
})
}
+
+ fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+ let end_idx = self.index.len() - 1;
+ let end = self.chunk_end(end_idx);
+ let found_idx = self.binary_search(0, 0, end_idx, end, offset);
+ let found_idx = match found_idx {
+ Ok(i) => i,
+ Err(_) => return None
+ };
+
+ let found_start = if found_idx == 0 {
+ 0
+ } else {
+ self.chunk_end(found_idx - 1)
+ };
+
+ Some((found_idx, offset - found_start))
+ }
}
struct CachedChunk {
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..b7e785d6 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
(csum, chunk_end)
}
+
+ fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+ if offset >= self.size {
+ return None;
+ }
+
+ Some((
+ (offset / self.chunk_size as u64) as usize,
+ offset % self.chunk_size as u64
+ ))
+ }
}
pub struct FixedIndexWriter {
diff --git a/src/backup/index.rs b/src/backup/index.rs
index efdf3b54..2eab8524 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -22,6 +22,9 @@ pub trait IndexFile {
fn index_bytes(&self) -> u64;
fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
+ /// Get the chunk index and the relative offset within it for a byte offset
+ fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
+
/// Compute index checksum and size
fn compute_csum(&self) -> ([u8; 32], u64);
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 14:16 ` Thomas Lamprecht
2020-07-22 14:24 ` Stefan Reiter
0 siblings, 1 reply; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:16 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
On 22.07.20 15:56, Stefan Reiter wrote:
> Necessary for byte-wise seeking through chunks in an index.
>
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
> src/backup/dynamic_index.rs | 18 ++++++++++++++++++
> src/backup/fixed_index.rs | 11 +++++++++++
> src/backup/index.rs | 3 +++
> 3 files changed, 32 insertions(+)
>
> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
> index 4907fe1f..887b7cf3 100644
> --- a/src/backup/dynamic_index.rs
> +++ b/src/backup/dynamic_index.rs
> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
> digest: self.index[pos].digest.clone(),
> })
> }
> +
> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> + let end_idx = self.index.len() - 1;
> + let end = self.chunk_end(end_idx);
> + let found_idx = self.binary_search(0, 0, end_idx, end, offset);
> + let found_idx = match found_idx {
> + Ok(i) => i,
> + Err(_) => return None
> + };
> +
> + let found_start = if found_idx == 0 {
> + 0
> + } else {
> + self.chunk_end(found_idx - 1)
> + };
> +
> + Some((found_idx, offset - found_start))
> + }
> }
>
> struct CachedChunk {
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..b7e785d6 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>
> (csum, chunk_end)
> }
> +
> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> + if offset >= self.size {
> + return None;
> + }
> +
> + Some((
> + (offset / self.chunk_size as u64) as usize,
> + offset % self.chunk_size as u64
modulo is really slow, but isn't chunk_size always a 2^x and thus we can
do the same here as we do in other places:
offset & (self.chunk_size - 1)
> + ))
> + }
> }
>
> pub struct FixedIndexWriter {
> diff --git a/src/backup/index.rs b/src/backup/index.rs
> index efdf3b54..2eab8524 100644
> --- a/src/backup/index.rs
> +++ b/src/backup/index.rs
> @@ -22,6 +22,9 @@ pub trait IndexFile {
> fn index_bytes(&self) -> u64;
> fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>
> + /// Get the chunk index and the relative offset within it for a byte offset
> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
> +
> /// Compute index checksum and size
> fn compute_csum(&self) -> ([u8; 32], u64);
>
>
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
2020-07-22 14:16 ` Thomas Lamprecht
@ 2020-07-22 14:24 ` Stefan Reiter
2020-07-22 14:41 ` Thomas Lamprecht
0 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 14:24 UTC (permalink / raw)
To: Thomas Lamprecht, Proxmox Backup Server development discussion
On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
> On 22.07.20 15:56, Stefan Reiter wrote:
>> Necessary for byte-wise seeking through chunks in an index.
>>
>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>> ---
>> src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>> src/backup/fixed_index.rs | 11 +++++++++++
>> src/backup/index.rs | 3 +++
>> 3 files changed, 32 insertions(+)
>>
>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>> index 4907fe1f..887b7cf3 100644
>> --- a/src/backup/dynamic_index.rs
>> +++ b/src/backup/dynamic_index.rs
>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>> digest: self.index[pos].digest.clone(),
>> })
>> }
>> +
>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> + let end_idx = self.index.len() - 1;
>> + let end = self.chunk_end(end_idx);
>> + let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>> + let found_idx = match found_idx {
>> + Ok(i) => i,
>> + Err(_) => return None
>> + };
>> +
>> + let found_start = if found_idx == 0 {
>> + 0
>> + } else {
>> + self.chunk_end(found_idx - 1)
>> + };
>> +
>> + Some((found_idx, offset - found_start))
>> + }
>> }
>>
>> struct CachedChunk {
>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>> index 73d0dad0..b7e785d6 100644
>> --- a/src/backup/fixed_index.rs
>> +++ b/src/backup/fixed_index.rs
>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>
>> (csum, chunk_end)
>> }
>> +
>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> + if offset >= self.size {
>> + return None;
>> + }
>> +
>> + Some((
>> + (offset / self.chunk_size as u64) as usize,
>> + offset % self.chunk_size as u64
>
> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
> do the same here as we do in other places:
>
> offset & (self.chunk_size - 1)
>
I found it more readable this way and I don't think it's hot-path enough
to make a real difference in performance.
But I don't mind, could even replace the div as well. Maybe an
assert!(chunk_size.is_power_of_two()) might be good somewhere though.
>> + ))
>> + }
>> }
>>
>> pub struct FixedIndexWriter {
>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>> index efdf3b54..2eab8524 100644
>> --- a/src/backup/index.rs
>> +++ b/src/backup/index.rs
>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>> fn index_bytes(&self) -> u64;
>> fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>
>> + /// Get the chunk index and the relative offset within it for a byte offset
>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>> +
>> /// Compute index checksum and size
>> fn compute_csum(&self) -> ([u8; 32], u64);
>>
>>
>
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
2020-07-22 14:24 ` Stefan Reiter
@ 2020-07-22 14:41 ` Thomas Lamprecht
0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:41 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
On 22.07.20 16:24, Stefan Reiter wrote:
> On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
>> On 22.07.20 15:56, Stefan Reiter wrote:
>>> Necessary for byte-wise seeking through chunks in an index.
>>>
>>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>>> ---
>>> src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>>> src/backup/fixed_index.rs | 11 +++++++++++
>>> src/backup/index.rs | 3 +++
>>> 3 files changed, 32 insertions(+)
>>>
>>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>>> index 4907fe1f..887b7cf3 100644
>>> --- a/src/backup/dynamic_index.rs
>>> +++ b/src/backup/dynamic_index.rs
>>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>>> digest: self.index[pos].digest.clone(),
>>> })
>>> }
>>> +
>>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> + let end_idx = self.index.len() - 1;
>>> + let end = self.chunk_end(end_idx);
>>> + let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>>> + let found_idx = match found_idx {
>>> + Ok(i) => i,
>>> + Err(_) => return None
>>> + };
>>> +
>>> + let found_start = if found_idx == 0 {
>>> + 0
>>> + } else {
>>> + self.chunk_end(found_idx - 1)
>>> + };
>>> +
>>> + Some((found_idx, offset - found_start))
>>> + }
>>> }
>>> struct CachedChunk {
>>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>>> index 73d0dad0..b7e785d6 100644
>>> --- a/src/backup/fixed_index.rs
>>> +++ b/src/backup/fixed_index.rs
>>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>> (csum, chunk_end)
>>> }
>>> +
>>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> + if offset >= self.size {
>>> + return None;
>>> + }
>>> +
>>> + Some((
>>> + (offset / self.chunk_size as u64) as usize,
>>> + offset % self.chunk_size as u64
>>
>> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
>> do the same here as we do in other places:
>>
>> offset & (self.chunk_size - 1)
>>
>
> I found it more readable this way and I don't think it's hot-path enough to make a real difference in performance.
then why not add a comment instead? This is some orders of magnitude slower,
not just 2 or 3% ...
>
> But I don't mind, could even replace the div as well. Maybe an assert!(chunk_size.is_power_of_two()) might be good somewhere though.
Yeah, somewhere in an initial entry point to such code an assert wouldn't hurt.
>
>>> + ))
>>> + }
>>> }
>>> pub struct FixedIndexWriter {
>>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>>> index efdf3b54..2eab8524 100644
>>> --- a/src/backup/index.rs
>>> +++ b/src/backup/index.rs
>>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>>> fn index_bytes(&self) -> u64;
>>> fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>> + /// Get the chunk index and the relative offset within it for a byte offset
>>> + fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>>> +
>>> /// Compute index checksum and size
>>> fn compute_csum(&self) -> ([u8; 32], u64);
>>>
>>
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
` (3 subsequent siblings)
5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
To: pbs-devel
Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.
Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.
AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
src/backup/index.rs | 1 +
2 files changed, 97 insertions(+), 20 deletions(-)
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
use std::future::Future;
use std::task::{Poll, Context};
use std::pin::Pin;
+use std::io::SeekFrom;
use anyhow::Error;
use futures::future::FutureExt;
use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
use proxmox::sys::error::io_err_other;
use proxmox::io_format_err;
use super::IndexFile;
use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
enum AsyncIndexReaderState<S> {
NoData,
WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
- HaveData(usize),
+ HaveData,
}
pub struct AsyncIndexReader<S, I: IndexFile> {
store: Option<S>,
index: I,
read_buffer: Vec<u8>,
+ current_chunk_offset: u64,
current_chunk_idx: usize,
- current_chunk_digest: [u8; 32],
+ current_chunk_info: Option<ChunkReadInfo>,
+ position: u64,
+ seek_to_pos: i64,
state: AsyncIndexReaderState<S>,
}
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
store: Some(store),
index,
read_buffer: Vec::with_capacity(1024 * 1024),
+ current_chunk_offset: 0,
current_chunk_idx: 0,
- current_chunk_digest: [0u8; 32],
+ current_chunk_info: None,
+ position: 0,
+ seek_to_pos: 0,
state: AsyncIndexReaderState::NoData,
}
}
@@ -58,23 +66,41 @@ where
loop {
match &mut this.state {
AsyncIndexReaderState::NoData => {
- if this.current_chunk_idx >= this.index.index_count() {
+ let (idx, offset) = if this.current_chunk_info.is_some() &&
+ this.position == this.current_chunk_info.as_ref().unwrap().range.end
+ {
+ // optimization for sequential chunk read
+ let next_idx = this.current_chunk_idx + 1;
+ (next_idx, 0)
+ } else {
+ match this.index.chunk_from_offset(this.position) {
+ Some(res) => res,
+ None => return Poll::Ready(Ok(0))
+ }
+ };
+
+ if idx >= this.index.index_count() {
return Poll::Ready(Ok(0));
}
- let digest = this
+ let info = this
.index
- .index_digest(this.current_chunk_idx)
- .ok_or(io_format_err!("could not get digest"))?
- .clone();
+ .chunk_info(idx)
+ .ok_or(io_format_err!("could not get digest"))?;
- if digest == this.current_chunk_digest {
- this.state = AsyncIndexReaderState::HaveData(0);
- continue;
+ this.current_chunk_offset = offset;
+ this.current_chunk_idx = idx;
+ let old_info = this.current_chunk_info.replace(info.clone());
+
+ if let Some(old_info) = old_info {
+ if old_info.digest == info.digest {
+ // hit, chunk is currently in cache
+ this.state = AsyncIndexReaderState::HaveData;
+ continue;
+ }
}
- this.current_chunk_digest = digest;
-
+ // miss, need to download new chunk
let store = match this.store.take() {
Some(store) => store,
None => {
@@ -83,7 +109,7 @@ where
};
let future = async move {
- store.read_chunk(&digest)
+ store.read_chunk(&info.digest)
.await
.map(move |x| (store, x))
};
@@ -95,7 +121,7 @@ where
Ok((store, mut chunk_data)) => {
this.read_buffer.clear();
this.read_buffer.append(&mut chunk_data);
- this.state = AsyncIndexReaderState::HaveData(0);
+ this.state = AsyncIndexReaderState::HaveData;
this.store = Some(store);
}
Err(err) => {
@@ -103,8 +129,8 @@ where
}
};
}
- AsyncIndexReaderState::HaveData(offset) => {
- let offset = *offset;
+ AsyncIndexReaderState::HaveData => {
+ let offset = this.current_chunk_offset as usize;
let len = this.read_buffer.len();
let n = if len - offset < buf.len() {
len - offset
@@ -113,11 +139,13 @@ where
};
buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+ this.position += n as u64;
+
if offset + n == len {
this.state = AsyncIndexReaderState::NoData;
- this.current_chunk_idx += 1;
} else {
- this.state = AsyncIndexReaderState::HaveData(offset + n);
+ this.current_chunk_offset += n as u64;
+ this.state = AsyncIndexReaderState::HaveData;
}
return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
}
}
}
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+ S: AsyncReadChunk + Unpin + Sync + 'static,
+ I: IndexFile + Unpin,
+{
+ fn start_seek(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<tokio::io::Result<()>> {
+ let this = Pin::get_mut(self);
+ this.seek_to_pos = match pos {
+ SeekFrom::Start(offset) => {
+ offset as i64
+ },
+ SeekFrom::End(offset) => {
+ this.index.index_bytes() as i64 + offset
+ },
+ SeekFrom::Current(offset) => {
+ this.position as i64 + offset
+ }
+ };
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_complete(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<tokio::io::Result<u64>> {
+ let this = Pin::get_mut(self);
+
+ let index_bytes = this.index.index_bytes();
+ if this.seek_to_pos < 0 {
+ return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+ } else if this.seek_to_pos > index_bytes as i64 {
+ this.position = index_bytes;
+ } else {
+ this.position = this.seek_to_pos as u64;
+ }
+
+ // even if seeking within one chunk, we need to go to NoData to
+ // recalculate the current_chunk_offset (data is cached anyway)
+ this.state = AsyncIndexReaderState::NoData;
+
+ Poll::Ready(Ok(this.position))
+ }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::ops::Range;
+#[derive(Clone)]
pub struct ChunkReadInfo {
pub range: Range<u64>,
pub digest: [u8; 32],
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
` (2 subsequent siblings)
5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
To: pbs-devel
replaced by AsyncIndexReader
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup/fixed_index.rs | 154 --------------------------------------
1 file changed, 154 deletions(-)
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index b7e785d6..59f63d2e 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -13,7 +13,6 @@ use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use super::read_chunk::*;
use super::ChunkInfo;
use proxmox::tools::io::ReadExt;
@@ -146,20 +145,6 @@ impl FixedIndexReader {
Ok(())
}
- #[inline]
- fn chunk_end(&self, pos: usize) -> u64 {
- if pos >= self.index_length {
- panic!("chunk index out of range");
- }
-
- let end = ((pos + 1) * self.chunk_size) as u64;
- if end > self.size {
- self.size
- } else {
- end
- }
- }
-
pub fn print_info(&self) {
println!("Size: {}", self.size);
println!("ChunkSize: {}", self.chunk_size);
@@ -476,142 +461,3 @@ impl FixedIndexWriter {
Ok(())
}
}
-
-pub struct BufferedFixedReader<S> {
- store: S,
- index: FixedIndexReader,
- archive_size: u64,
- read_buffer: Vec<u8>,
- buffered_chunk_idx: usize,
- buffered_chunk_start: u64,
- read_offset: u64,
-}
-
-impl<S: ReadChunk> BufferedFixedReader<S> {
- pub fn new(index: FixedIndexReader, store: S) -> Self {
- let archive_size = index.size;
- Self {
- store,
- index,
- archive_size,
- read_buffer: Vec::with_capacity(1024 * 1024),
- buffered_chunk_idx: 0,
- buffered_chunk_start: 0,
- read_offset: 0,
- }
- }
-
- pub fn archive_size(&self) -> u64 {
- self.archive_size
- }
-
- fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
- let index = &self.index;
- let info = match index.chunk_info(idx) {
- Some(info) => info,
- None => bail!("chunk index out of range"),
- };
-
- // fixme: avoid copy
-
- let data = self.store.read_chunk(&info.digest)?;
- let size = info.range.end - info.range.start;
- if size != data.len() as u64 {
- bail!("read chunk with wrong size ({} != {}", size, data.len());
- }
-
- self.read_buffer.clear();
- self.read_buffer.extend_from_slice(&data);
-
- self.buffered_chunk_idx = idx;
-
- self.buffered_chunk_start = info.range.start as u64;
- Ok(())
- }
-}
-
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
- fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
- if offset == self.archive_size {
- return Ok(&self.read_buffer[0..0]);
- }
-
- let buffer_len = self.read_buffer.len();
- let index = &self.index;
-
- // optimization for sequential read
- if buffer_len > 0
- && ((self.buffered_chunk_idx + 1) < index.index_length)
- && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
- {
- let next_idx = self.buffered_chunk_idx + 1;
- let next_end = index.chunk_end(next_idx);
- if offset < next_end {
- self.buffer_chunk(next_idx)?;
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- return Ok(&self.read_buffer[buffer_offset..]);
- }
- }
-
- if (buffer_len == 0)
- || (offset < self.buffered_chunk_start)
- || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
- {
- let idx = (offset / index.chunk_size as u64) as usize;
- self.buffer_chunk(idx)?;
- }
-
- let buffer_offset = (offset - self.buffered_chunk_start) as usize;
- Ok(&self.read_buffer[buffer_offset..])
- }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
- use crate::tools::BufferedRead;
- use std::io::{Error, ErrorKind};
-
- let data = match self.buffered_read(self.read_offset) {
- Ok(v) => v,
- Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
- };
-
- let n = if data.len() > buf.len() {
- buf.len()
- } else {
- data.len()
- };
-
- unsafe {
- std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
- }
-
- self.read_offset += n as u64;
-
- Ok(n)
- }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
- fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
- let new_offset = match pos {
- SeekFrom::Start(start_offset) => start_offset as i64,
- SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
- SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
- };
-
- use std::io::{Error, ErrorKind};
- if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
- return Err(Error::new(
- ErrorKind::Other,
- format!(
- "seek is out of range {} ([0..{}])",
- new_offset, self.archive_size
- ),
- ));
- }
- self.read_offset = new_offset as u64;
-
- Ok(self.read_offset)
- }
-}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
` (2 preceding siblings ...)
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
2020-07-23 8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
To: pbs-devel
Replaces the removed BufferedFixedReader and makes the API fully async
(not block_in_place or block_on which could break with many requests at
once).
A tokio::sync::Mutex needs to be used for exclusive access, since a
regular one would not work with async/await calls.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/restore.rs | 21 ++++++++++-----------
1 file changed, 10 insertions(+), 11 deletions(-)
diff --git a/src/restore.rs b/src/restore.rs
index 3e37066..2be0295 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,13 +1,14 @@
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::SeekFrom;
use std::convert::TryInto;
use anyhow::{format_err, bail, Error};
use once_cell::sync::OnceCell;
use tokio::runtime::Runtime;
+use tokio::prelude::*;
-use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place};
+use proxmox_backup::tools::runtime::get_runtime_with_builder;
use proxmox_backup::backup::*;
use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
@@ -16,7 +17,7 @@ use crate::registry::Registry;
use crate::capi_types::DataPointer;
struct ImageAccessInfo {
- reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>,
+ reader: Arc<tokio::sync::Mutex<AsyncIndexReader<RemoteChunkReader, FixedIndexReader>>>,
_archive_name: String,
archive_size: u64,
}
@@ -231,12 +232,12 @@ impl RestoreTask {
let index = client.download_fixed_index(&manifest, &archive_name).await?;
let archive_size = index.index_bytes();
- let reader = BufferedFixedReader::new(index, chunk_reader);
+ let reader = AsyncIndexReader::new(index, chunk_reader);
let info = ImageAccessInfo {
archive_size,
_archive_name: archive_name, /// useful to debug
- reader: Arc::new(Mutex::new(reader)),
+ reader: Arc::new(tokio::sync::Mutex::new(reader)),
};
(*self.image_registry.lock().unwrap()).register(info)
@@ -260,12 +261,10 @@ impl RestoreTask {
bail!("read index {} out of bounds {}", offset, image_size);
}
- let bytes = block_in_place(|| {
- let mut reader = reader.lock().unwrap();
- reader.seek(SeekFrom::Start(offset))?;
- let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
- reader.read(buf)
- })?;
+ let mut reader = reader.lock().await;
+ reader.seek(SeekFrom::Start(offset)).await?;
+ let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+ let bytes = reader.read(buf).await?;
Ok(bytes.try_into()?)
}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
` (3 preceding siblings ...)
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
2020-07-23 8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
To: pbs-devel
QEMU will always assume EOF when less bytes than requested are returned
by a block drivers 'read' interface, so we need to fill the buffer up to
'size' if possible.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2 note: this was previously a QEMU patch, but honestly that's stupid so let's
do it in Rust instead.
current-api.h | 4 ++--
src/lib.rs | 4 ++--
src/restore.rs | 20 ++++++++++++++++----
3 files changed, 20 insertions(+), 8 deletions(-)
diff --git a/current-api.h b/current-api.h
index 15bb275..d77eff6 100644
--- a/current-api.h
+++ b/current-api.h
@@ -364,8 +364,8 @@ int proxmox_restore_read_image_at(ProxmoxRestoreHandle *handle,
* Note: The data pointer needs to be valid until the async
* opteration is finished.
*
- * Note: It is not an error for a successful call to transfer fewer
- * bytes than requested.
+ * Note: The call will only ever transfer less than 'size' bytes if
+ * the end of the file has been reached.
*/
void proxmox_restore_read_image_at_async(ProxmoxRestoreHandle *handle,
uint8_t aid,
diff --git a/src/lib.rs b/src/lib.rs
index d4b9370..3346be8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -934,8 +934,8 @@ pub extern "C" fn proxmox_restore_read_image_at(
/// Note: The data pointer needs to be valid until the async
/// opteration is finished.
///
-/// Note: It is not an error for a successful call to transfer fewer
-/// bytes than requested.
+/// Note: The call will only ever transfer less than 'size' bytes if
+/// the end of the file has been reached.
#[no_mangle]
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub extern "C" fn proxmox_restore_read_image_at_async(
diff --git a/src/restore.rs b/src/restore.rs
index 2be0295..e43d040 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -262,10 +262,22 @@ impl RestoreTask {
}
let mut reader = reader.lock().await;
- reader.seek(SeekFrom::Start(offset)).await?;
- let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
- let bytes = reader.read(buf).await?;
- Ok(bytes.try_into()?)
+ let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+ let mut read = 0;
+
+ while read < size {
+ reader.seek(SeekFrom::Start(offset + read)).await?;
+ let bytes = reader.read(&mut buf[read as usize..]).await?;
+
+ if bytes == 0 {
+ // EOF
+ break;
+ }
+
+ read += bytes as u64;
+ }
+
+ Ok(read.try_into()?)
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
` (4 preceding siblings ...)
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
@ 2020-07-23 8:31 ` Thomas Lamprecht
5 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-23 8:31 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Stefan Reiter
On 22.07.20 15:56, Stefan Reiter wrote:
> When using the PBS blockdriver with qemu-nbd (for example), it can happen that
> enough read requests are issued to saturate the tokio thread pool. Not an issue
> in general, but as me and Wolfgang painstakenly discovered a while back, it does
> break block_on, which is used in BufferedFixedReader. This means that reading
> larger amounts of data would hang the QEMU process [0].
>
> Fix this by replacing the BufferedFixedReader with an AsyncIndexReader,
> implementing AsyncSeek for it in the process. This makes the entire API async,
> requiring no block_on anymore.
>
> Incidentally, this also gave me my best benchmark results yet, coming in at
> above 1.6 Gb/s read speed via NBD on my local machine.
>
> Additionally I discovered a seperate bug (fixed by patch 5), wherein read
> requests that we're not aligned to the chunk size would return bogus data. This
> too only seems to happen in non-VM connections (e.g. nbd, etc...).
>
> v2:
> * Remove BufferedFixedReader entirely, use AsyncIndexReader instead
> * Implement AsyncSeek for AsyncIndexReader
> * Fix the second bug in Rust instead of QEMU C
>
>
> [0] ...and since the NBD kernel driver appears to be horribly broken, this often
> also crashes most of the system, but that's a different story. If you ever get
> in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
> disconnect the device ('qemu-nbd -d' intelligently issues a read before
> disconnecting, thus hanging before getting anything done...)
>
>
> backup: Stefan Reiter (3):
> add and implement chunk_from_offset for IndexFile
> implement AsyncSeek for AsyncIndexReader
> remove BufferedFixedReader interface
>
> src/backup/async_index_reader.rs | 116 ++++++++++++++++++----
> src/backup/dynamic_index.rs | 18 ++++
> src/backup/fixed_index.rs | 165 +++----------------------------
> src/backup/index.rs | 4 +
> 4 files changed, 129 insertions(+), 174 deletions(-)
>
> backup-qemu: Stefan Reiter (2):
> use AsyncIndexReader for read_image_at
> read_image_at: iterate until buffer is filled
>
> current-api.h | 4 ++--
> src/lib.rs | 4 ++--
> src/restore.rs | 35 +++++++++++++++++++++++------------
> 3 files changed, 27 insertions(+), 16 deletions(-)
>
applied series, thanks!
using the noload mount option is really the key here, for ext4 at least ^^
# mount -o noload /dev/nbd0p1 /mnt/foo
^ permalink raw reply [flat|nested] 10+ messages in thread