all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings
@ 2020-07-22 13:56 Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
                   ` (5 more replies)
  0 siblings, 6 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

When using the PBS blockdriver with qemu-nbd (for example), it can happen that
enough read requests are issued to saturate the tokio thread pool. Not an issue
in general, but as me and Wolfgang painstakenly discovered a while back, it does
break block_on, which is used in BufferedFixedReader. This means that reading
larger amounts of data would hang the QEMU process [0].

Fix this by replacing the BufferedFixedReader with an AsyncIndexReader,
implementing AsyncSeek for it in the process. This makes the entire API async,
requiring no block_on anymore.

Incidentally, this also gave me my best benchmark results yet, coming in at
above 1.6 Gb/s read speed via NBD on my local machine.

Additionally I discovered a seperate bug (fixed by patch 5), wherein read
requests that we're not aligned to the chunk size would return bogus data. This
too only seems to happen in non-VM connections (e.g. nbd, etc...).

v2:
* Remove BufferedFixedReader entirely, use AsyncIndexReader instead
* Implement AsyncSeek for AsyncIndexReader
* Fix the second bug in Rust instead of QEMU C


[0] ...and since the NBD kernel driver appears to be horribly broken, this often
also crashes most of the system, but that's a different story. If you ever get
in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
disconnect the device ('qemu-nbd -d' intelligently issues a read before
disconnecting, thus hanging before getting anything done...)


backup: Stefan Reiter (3):
  add and implement chunk_from_offset for IndexFile
  implement AsyncSeek for AsyncIndexReader
  remove BufferedFixedReader interface

 src/backup/async_index_reader.rs | 116 ++++++++++++++++++----
 src/backup/dynamic_index.rs      |  18 ++++
 src/backup/fixed_index.rs        | 165 +++----------------------------
 src/backup/index.rs              |   4 +
 4 files changed, 129 insertions(+), 174 deletions(-)

backup-qemu: Stefan Reiter (2):
  use AsyncIndexReader for read_image_at
  read_image_at: iterate until buffer is filled

 current-api.h  |  4 ++--
 src/lib.rs     |  4 ++--
 src/restore.rs | 35 +++++++++++++++++++++++------------
 3 files changed, 27 insertions(+), 16 deletions(-)

-- 
2.20.1




^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 14:16   ` Thomas Lamprecht
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Necessary for byte-wise seeking through chunks in an index.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/dynamic_index.rs | 18 ++++++++++++++++++
 src/backup/fixed_index.rs   | 11 +++++++++++
 src/backup/index.rs         |  3 +++
 3 files changed, 32 insertions(+)

diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
index 4907fe1f..887b7cf3 100644
--- a/src/backup/dynamic_index.rs
+++ b/src/backup/dynamic_index.rs
@@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
             digest: self.index[pos].digest.clone(),
         })
     }
+
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+        let end_idx = self.index.len() - 1;
+        let end = self.chunk_end(end_idx);
+        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
+        let found_idx = match found_idx {
+            Ok(i) => i,
+            Err(_) => return None
+        };
+
+        let found_start = if found_idx == 0 {
+            0
+        } else {
+            self.chunk_end(found_idx - 1)
+        };
+
+        Some((found_idx, offset - found_start))
+    }
 }
 
 struct CachedChunk {
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..b7e785d6 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
 
         (csum, chunk_end)
     }
+
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+        if offset >= self.size {
+            return None;
+        }
+
+        Some((
+            (offset / self.chunk_size as u64) as usize,
+            offset % self.chunk_size as u64
+        ))
+    }
 }
 
 pub struct FixedIndexWriter {
diff --git a/src/backup/index.rs b/src/backup/index.rs
index efdf3b54..2eab8524 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -22,6 +22,9 @@ pub trait IndexFile {
     fn index_bytes(&self) -> u64;
     fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
 
+    /// Get the chunk index and the relative offset within it for a byte offset
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
+
     /// Compute index checksum and size
     fn compute_csum(&self) -> ([u8; 32], u64);
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
 src/backup/index.rs              |   1 +
 2 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
 use std::future::Future;
 use std::task::{Poll, Context};
 use std::pin::Pin;
+use std::io::SeekFrom;
 
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
 
 use super::IndexFile;
 use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
 
 enum AsyncIndexReaderState<S> {
     NoData,
     WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
-    HaveData(usize),
+    HaveData,
 }
 
 pub struct AsyncIndexReader<S, I: IndexFile> {
     store: Option<S>,
     index: I,
     read_buffer: Vec<u8>,
+    current_chunk_offset: u64,
     current_chunk_idx: usize,
-    current_chunk_digest: [u8; 32],
+    current_chunk_info: Option<ChunkReadInfo>,
+    position: u64,
+    seek_to_pos: i64,
     state: AsyncIndexReaderState<S>,
 }
 
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
             store: Some(store),
             index,
             read_buffer: Vec::with_capacity(1024 * 1024),
+            current_chunk_offset: 0,
             current_chunk_idx: 0,
-            current_chunk_digest: [0u8; 32],
+            current_chunk_info: None,
+            position: 0,
+            seek_to_pos: 0,
             state: AsyncIndexReaderState::NoData,
         }
     }
@@ -58,23 +66,41 @@ where
         loop {
             match &mut this.state {
                 AsyncIndexReaderState::NoData => {
-                    if this.current_chunk_idx >= this.index.index_count() {
+                    let (idx, offset) = if this.current_chunk_info.is_some() &&
+                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
+                    {
+                        // optimization for sequential chunk read
+                        let next_idx = this.current_chunk_idx + 1;
+                        (next_idx, 0)
+                    } else {
+                        match this.index.chunk_from_offset(this.position) {
+                            Some(res) => res,
+                            None => return Poll::Ready(Ok(0))
+                        }
+                    };
+
+                    if idx >= this.index.index_count() {
                         return Poll::Ready(Ok(0));
                     }
 
-                    let digest = this
+                    let info = this
                         .index
-                        .index_digest(this.current_chunk_idx)
-                        .ok_or(io_format_err!("could not get digest"))?
-                        .clone();
+                        .chunk_info(idx)
+                        .ok_or(io_format_err!("could not get digest"))?;
 
-                    if digest == this.current_chunk_digest {
-                        this.state = AsyncIndexReaderState::HaveData(0);
-                        continue;
+                    this.current_chunk_offset = offset;
+                    this.current_chunk_idx = idx;
+                    let old_info = this.current_chunk_info.replace(info.clone());
+
+                    if let Some(old_info) = old_info {
+                        if old_info.digest == info.digest {
+                            // hit, chunk is currently in cache
+                            this.state = AsyncIndexReaderState::HaveData;
+                            continue;
+                        }
                     }
 
-                    this.current_chunk_digest = digest;
-
+                    // miss, need to download new chunk
                     let store = match this.store.take() {
                         Some(store) => store,
                         None => {
@@ -83,7 +109,7 @@ where
                     };
 
                     let future = async move {
-                        store.read_chunk(&digest)
+                        store.read_chunk(&info.digest)
                             .await
                             .map(move |x| (store, x))
                     };
@@ -95,7 +121,7 @@ where
                         Ok((store, mut chunk_data)) => {
                             this.read_buffer.clear();
                             this.read_buffer.append(&mut chunk_data);
-                            this.state = AsyncIndexReaderState::HaveData(0);
+                            this.state = AsyncIndexReaderState::HaveData;
                             this.store = Some(store);
                         }
                         Err(err) => {
@@ -103,8 +129,8 @@ where
                         }
                     };
                 }
-                AsyncIndexReaderState::HaveData(offset) => {
-                    let offset = *offset;
+                AsyncIndexReaderState::HaveData => {
+                    let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
                     let n = if len - offset < buf.len() {
                         len - offset
@@ -113,11 +139,13 @@ where
                     };
 
                     buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    this.position += n as u64;
+
                     if offset + n == len {
                         this.state = AsyncIndexReaderState::NoData;
-                        this.current_chunk_idx += 1;
                     } else {
-                        this.state = AsyncIndexReaderState::HaveData(offset + n);
+                        this.current_chunk_offset += n as u64;
+                        this.state = AsyncIndexReaderState::HaveData;
                     }
 
                     return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
         }
     }
 }
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+    S: AsyncReadChunk + Unpin + Sync + 'static,
+    I: IndexFile + Unpin,
+{
+    fn start_seek(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => {
+                offset as i64
+            },
+            SeekFrom::End(offset) => {
+                this.index.index_bytes() as i64 + offset
+            },
+            SeekFrom::Current(offset) => {
+                this.position as i64 + offset
+            }
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index.index_bytes();
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        // even if seeking within one chunk, we need to go to NoData to
+        // recalculate the current_chunk_offset (data is cached anyway)
+        this.state = AsyncIndexReaderState::NoData;
+
+        Poll::Ready(Ok(this.position))
+    }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::ops::Range;
 
+#[derive(Clone)]
 pub struct ChunkReadInfo {
     pub range: Range<u64>,
     pub digest: [u8; 32],
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

replaced by AsyncIndexReader

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/fixed_index.rs | 154 --------------------------------------
 1 file changed, 154 deletions(-)

diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index b7e785d6..59f63d2e 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -13,7 +13,6 @@ use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
-use super::read_chunk::*;
 use super::ChunkInfo;
 
 use proxmox::tools::io::ReadExt;
@@ -146,20 +145,6 @@ impl FixedIndexReader {
         Ok(())
     }
 
-    #[inline]
-    fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_length {
-            panic!("chunk index out of range");
-        }
-
-        let end = ((pos + 1) * self.chunk_size) as u64;
-        if end > self.size {
-            self.size
-        } else {
-            end
-        }
-    }
-
     pub fn print_info(&self) {
         println!("Size: {}", self.size);
         println!("ChunkSize: {}", self.chunk_size);
@@ -476,142 +461,3 @@ impl FixedIndexWriter {
         Ok(())
     }
 }
-
-pub struct BufferedFixedReader<S> {
-    store: S,
-    index: FixedIndexReader,
-    archive_size: u64,
-    read_buffer: Vec<u8>,
-    buffered_chunk_idx: usize,
-    buffered_chunk_start: u64,
-    read_offset: u64,
-}
-
-impl<S: ReadChunk> BufferedFixedReader<S> {
-    pub fn new(index: FixedIndexReader, store: S) -> Self {
-        let archive_size = index.size;
-        Self {
-            store,
-            index,
-            archive_size,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            buffered_chunk_idx: 0,
-            buffered_chunk_start: 0,
-            read_offset: 0,
-        }
-    }
-
-    pub fn archive_size(&self) -> u64 {
-        self.archive_size
-    }
-
-    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
-        let index = &self.index;
-        let info = match index.chunk_info(idx) {
-            Some(info) => info,
-            None => bail!("chunk index out of range"),
-        };
-
-        // fixme: avoid copy
-
-        let data = self.store.read_chunk(&info.digest)?;
-        let size = info.range.end - info.range.start;
-        if size != data.len() as u64 {
-            bail!("read chunk with wrong size ({} != {}", size, data.len());
-        }
-
-        self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
-
-        self.buffered_chunk_idx = idx;
-
-        self.buffered_chunk_start = info.range.start as u64;
-        Ok(())
-    }
-}
-
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
-    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
-        if offset == self.archive_size {
-            return Ok(&self.read_buffer[0..0]);
-        }
-
-        let buffer_len = self.read_buffer.len();
-        let index = &self.index;
-
-        // optimization for sequential read
-        if buffer_len > 0
-            && ((self.buffered_chunk_idx + 1) < index.index_length)
-            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let next_idx = self.buffered_chunk_idx + 1;
-            let next_end = index.chunk_end(next_idx);
-            if offset < next_end {
-                self.buffer_chunk(next_idx)?;
-                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-                return Ok(&self.read_buffer[buffer_offset..]);
-            }
-        }
-
-        if (buffer_len == 0)
-            || (offset < self.buffered_chunk_start)
-            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let idx = (offset / index.chunk_size as u64) as usize;
-            self.buffer_chunk(idx)?;
-        }
-
-        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-        Ok(&self.read_buffer[buffer_offset..])
-    }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-        use crate::tools::BufferedRead;
-        use std::io::{Error, ErrorKind};
-
-        let data = match self.buffered_read(self.read_offset) {
-            Ok(v) => v,
-            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
-        };
-
-        let n = if data.len() > buf.len() {
-            buf.len()
-        } else {
-            data.len()
-        };
-
-        unsafe {
-            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
-        }
-
-        self.read_offset += n as u64;
-
-        Ok(n)
-    }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-        let new_offset = match pos {
-            SeekFrom::Start(start_offset) => start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
-            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
-        };
-
-        use std::io::{Error, ErrorKind};
-        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!(
-                    "seek is out of range {} ([0..{}])",
-                    new_offset, self.archive_size
-                ),
-            ));
-        }
-        self.read_offset = new_offset as u64;
-
-        Ok(self.read_offset)
-    }
-}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (2 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
  2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Replaces the removed BufferedFixedReader and makes the API fully async
(not block_in_place or block_on which could break with many requests at
once).

A tokio::sync::Mutex needs to be used for exclusive access, since a
regular one would not work with async/await calls.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/restore.rs | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/src/restore.rs b/src/restore.rs
index 3e37066..2be0295 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,13 +1,14 @@
 use std::sync::{Arc, Mutex};
 use std::collections::HashMap;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::SeekFrom;
 use std::convert::TryInto;
 
 use anyhow::{format_err, bail, Error};
 use once_cell::sync::OnceCell;
 use tokio::runtime::Runtime;
+use tokio::prelude::*;
 
-use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place};
+use proxmox_backup::tools::runtime::get_runtime_with_builder;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -16,7 +17,7 @@ use crate::registry::Registry;
 use crate::capi_types::DataPointer;
 
 struct ImageAccessInfo {
-    reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>,
+    reader: Arc<tokio::sync::Mutex<AsyncIndexReader<RemoteChunkReader, FixedIndexReader>>>,
     _archive_name: String,
     archive_size: u64,
 }
@@ -231,12 +232,12 @@ impl RestoreTask {
         let index = client.download_fixed_index(&manifest, &archive_name).await?;
         let archive_size = index.index_bytes();
 
-        let reader = BufferedFixedReader::new(index, chunk_reader);
+        let reader = AsyncIndexReader::new(index, chunk_reader);
 
         let info = ImageAccessInfo {
             archive_size,
             _archive_name: archive_name, /// useful to debug
-            reader: Arc::new(Mutex::new(reader)),
+            reader: Arc::new(tokio::sync::Mutex::new(reader)),
         };
 
         (*self.image_registry.lock().unwrap()).register(info)
@@ -260,12 +261,10 @@ impl RestoreTask {
             bail!("read index {} out of bounds {}", offset, image_size);
         }
 
-        let bytes = block_in_place(|| {
-            let mut reader = reader.lock().unwrap();
-            reader.seek(SeekFrom::Start(offset))?;
-            let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
-            reader.read(buf)
-        })?;
+        let mut reader = reader.lock().await;
+        reader.seek(SeekFrom::Start(offset)).await?;
+        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+        let bytes = reader.read(buf).await?;
 
         Ok(bytes.try_into()?)
     }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (3 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

QEMU will always assume EOF when less bytes than requested are returned
by a block drivers 'read' interface, so we need to fill the buffer up to
'size' if possible.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

v2 note: this was previously a QEMU patch, but honestly that's stupid so let's
do it in Rust instead.

 current-api.h  |  4 ++--
 src/lib.rs     |  4 ++--
 src/restore.rs | 20 ++++++++++++++++----
 3 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/current-api.h b/current-api.h
index 15bb275..d77eff6 100644
--- a/current-api.h
+++ b/current-api.h
@@ -364,8 +364,8 @@ int proxmox_restore_read_image_at(ProxmoxRestoreHandle *handle,
  * Note: The data pointer needs to be valid until the async
  * opteration is finished.
  *
- * Note: It is not an error for a successful call to transfer fewer
- * bytes than requested.
+ * Note: The call will only ever transfer less than 'size' bytes if
+ * the end of the file has been reached.
  */
 void proxmox_restore_read_image_at_async(ProxmoxRestoreHandle *handle,
                                          uint8_t aid,
diff --git a/src/lib.rs b/src/lib.rs
index d4b9370..3346be8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -934,8 +934,8 @@ pub extern "C" fn proxmox_restore_read_image_at(
 /// Note: The data pointer needs to be valid until the async
 /// opteration is finished.
 ///
-/// Note: It is not an error for a successful call to transfer fewer
-/// bytes than requested.
+/// Note: The call will only ever transfer less than 'size' bytes if
+/// the end of the file has been reached.
 #[no_mangle]
 #[allow(clippy::not_unsafe_ptr_arg_deref)]
 pub extern "C" fn proxmox_restore_read_image_at_async(
diff --git a/src/restore.rs b/src/restore.rs
index 2be0295..e43d040 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -262,10 +262,22 @@ impl RestoreTask {
         }
 
         let mut reader = reader.lock().await;
-        reader.seek(SeekFrom::Start(offset)).await?;
-        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
-        let bytes = reader.read(buf).await?;
 
-        Ok(bytes.try_into()?)
+        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+        let mut read = 0;
+
+        while read < size {
+            reader.seek(SeekFrom::Start(offset + read)).await?;
+            let bytes = reader.read(&mut buf[read as usize..]).await?;
+
+            if bytes == 0 {
+                // EOF
+                break;
+            }
+
+            read += bytes as u64;
+        }
+
+        Ok(read.try_into()?)
     }
 }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 14:16   ` Thomas Lamprecht
  2020-07-22 14:24     ` Stefan Reiter
  0 siblings, 1 reply; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:16 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 15:56, Stefan Reiter wrote:
> Necessary for byte-wise seeking through chunks in an index.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>  src/backup/fixed_index.rs   | 11 +++++++++++
>  src/backup/index.rs         |  3 +++
>  3 files changed, 32 insertions(+)
> 
> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
> index 4907fe1f..887b7cf3 100644
> --- a/src/backup/dynamic_index.rs
> +++ b/src/backup/dynamic_index.rs
> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>              digest: self.index[pos].digest.clone(),
>          })
>      }
> +
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> +        let end_idx = self.index.len() - 1;
> +        let end = self.chunk_end(end_idx);
> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
> +        let found_idx = match found_idx {
> +            Ok(i) => i,
> +            Err(_) => return None
> +        };
> +
> +        let found_start = if found_idx == 0 {
> +            0
> +        } else {
> +            self.chunk_end(found_idx - 1)
> +        };
> +
> +        Some((found_idx, offset - found_start))
> +    }
>  }
>  
>  struct CachedChunk {
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..b7e785d6 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>  
>          (csum, chunk_end)
>      }
> +
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> +        if offset >= self.size {
> +            return None;
> +        }
> +
> +        Some((
> +            (offset / self.chunk_size as u64) as usize,
> +            offset % self.chunk_size as u64

modulo is really slow, but isn't chunk_size always a 2^x and thus we can
do the same here as we do in other places:

offset & (self.chunk_size - 1)

> +        ))
> +    }
>  }
>  
>  pub struct FixedIndexWriter {
> diff --git a/src/backup/index.rs b/src/backup/index.rs
> index efdf3b54..2eab8524 100644
> --- a/src/backup/index.rs
> +++ b/src/backup/index.rs
> @@ -22,6 +22,9 @@ pub trait IndexFile {
>      fn index_bytes(&self) -> u64;
>      fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>  
> +    /// Get the chunk index and the relative offset within it for a byte offset
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
> +
>      /// Compute index checksum and size
>      fn compute_csum(&self) -> ([u8; 32], u64);
>  
> 





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 14:16   ` Thomas Lamprecht
@ 2020-07-22 14:24     ` Stefan Reiter
  2020-07-22 14:41       ` Thomas Lamprecht
  0 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 14:24 UTC (permalink / raw)
  To: Thomas Lamprecht, Proxmox Backup Server development discussion

On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
> On 22.07.20 15:56, Stefan Reiter wrote:
>> Necessary for byte-wise seeking through chunks in an index.
>>
>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>> ---
>>   src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>>   src/backup/fixed_index.rs   | 11 +++++++++++
>>   src/backup/index.rs         |  3 +++
>>   3 files changed, 32 insertions(+)
>>
>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>> index 4907fe1f..887b7cf3 100644
>> --- a/src/backup/dynamic_index.rs
>> +++ b/src/backup/dynamic_index.rs
>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>>               digest: self.index[pos].digest.clone(),
>>           })
>>       }
>> +
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> +        let end_idx = self.index.len() - 1;
>> +        let end = self.chunk_end(end_idx);
>> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>> +        let found_idx = match found_idx {
>> +            Ok(i) => i,
>> +            Err(_) => return None
>> +        };
>> +
>> +        let found_start = if found_idx == 0 {
>> +            0
>> +        } else {
>> +            self.chunk_end(found_idx - 1)
>> +        };
>> +
>> +        Some((found_idx, offset - found_start))
>> +    }
>>   }
>>   
>>   struct CachedChunk {
>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>> index 73d0dad0..b7e785d6 100644
>> --- a/src/backup/fixed_index.rs
>> +++ b/src/backup/fixed_index.rs
>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>   
>>           (csum, chunk_end)
>>       }
>> +
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> +        if offset >= self.size {
>> +            return None;
>> +        }
>> +
>> +        Some((
>> +            (offset / self.chunk_size as u64) as usize,
>> +            offset % self.chunk_size as u64
> 
> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
> do the same here as we do in other places:
> 
> offset & (self.chunk_size - 1)
> 

I found it more readable this way and I don't think it's hot-path enough 
to make a real difference in performance.

But I don't mind, could even replace the div as well. Maybe an 
assert!(chunk_size.is_power_of_two()) might be good somewhere though.

>> +        ))
>> +    }
>>   }
>>   
>>   pub struct FixedIndexWriter {
>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>> index efdf3b54..2eab8524 100644
>> --- a/src/backup/index.rs
>> +++ b/src/backup/index.rs
>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>>       fn index_bytes(&self) -> u64;
>>       fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>   
>> +    /// Get the chunk index and the relative offset within it for a byte offset
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>> +
>>       /// Compute index checksum and size
>>       fn compute_csum(&self) -> ([u8; 32], u64);
>>   
>>
> 




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 14:24     ` Stefan Reiter
@ 2020-07-22 14:41       ` Thomas Lamprecht
  0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:41 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 16:24, Stefan Reiter wrote:
> On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
>> On 22.07.20 15:56, Stefan Reiter wrote:
>>> Necessary for byte-wise seeking through chunks in an index.
>>>
>>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>>> ---
>>>   src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>>>   src/backup/fixed_index.rs   | 11 +++++++++++
>>>   src/backup/index.rs         |  3 +++
>>>   3 files changed, 32 insertions(+)
>>>
>>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>>> index 4907fe1f..887b7cf3 100644
>>> --- a/src/backup/dynamic_index.rs
>>> +++ b/src/backup/dynamic_index.rs
>>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>>>               digest: self.index[pos].digest.clone(),
>>>           })
>>>       }
>>> +
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> +        let end_idx = self.index.len() - 1;
>>> +        let end = self.chunk_end(end_idx);
>>> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>>> +        let found_idx = match found_idx {
>>> +            Ok(i) => i,
>>> +            Err(_) => return None
>>> +        };
>>> +
>>> +        let found_start = if found_idx == 0 {
>>> +            0
>>> +        } else {
>>> +            self.chunk_end(found_idx - 1)
>>> +        };
>>> +
>>> +        Some((found_idx, offset - found_start))
>>> +    }
>>>   }
>>>     struct CachedChunk {
>>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>>> index 73d0dad0..b7e785d6 100644
>>> --- a/src/backup/fixed_index.rs
>>> +++ b/src/backup/fixed_index.rs
>>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>>             (csum, chunk_end)
>>>       }
>>> +
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> +        if offset >= self.size {
>>> +            return None;
>>> +        }
>>> +
>>> +        Some((
>>> +            (offset / self.chunk_size as u64) as usize,
>>> +            offset % self.chunk_size as u64
>>
>> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
>> do the same here as we do in other places:
>>
>> offset & (self.chunk_size - 1)
>>
> 
> I found it more readable this way and I don't think it's hot-path enough to make a real difference in performance.

then why not add a comment instead? This is some orders of magnitude slower,
not just 2 or 3% ...

> 
> But I don't mind, could even replace the div as well. Maybe an assert!(chunk_size.is_power_of_two()) might be good somewhere though.

Yeah, somewhere in an initial entry point to such code an assert wouldn't hurt.

> 
>>> +        ))
>>> +    }
>>>   }
>>>     pub struct FixedIndexWriter {
>>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>>> index efdf3b54..2eab8524 100644
>>> --- a/src/backup/index.rs
>>> +++ b/src/backup/index.rs
>>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>>>       fn index_bytes(&self) -> u64;
>>>       fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>>   +    /// Get the chunk index and the relative offset within it for a byte offset
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>>> +
>>>       /// Compute index checksum and size
>>>       fn compute_csum(&self) -> ([u8; 32], u64);
>>>  
>>






^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (4 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
@ 2020-07-23  8:31 ` Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-23  8:31 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 15:56, Stefan Reiter wrote:
> When using the PBS blockdriver with qemu-nbd (for example), it can happen that
> enough read requests are issued to saturate the tokio thread pool. Not an issue
> in general, but as me and Wolfgang painstakenly discovered a while back, it does
> break block_on, which is used in BufferedFixedReader. This means that reading
> larger amounts of data would hang the QEMU process [0].
> 
> Fix this by replacing the BufferedFixedReader with an AsyncIndexReader,
> implementing AsyncSeek for it in the process. This makes the entire API async,
> requiring no block_on anymore.
> 
> Incidentally, this also gave me my best benchmark results yet, coming in at
> above 1.6 Gb/s read speed via NBD on my local machine.
> 
> Additionally I discovered a seperate bug (fixed by patch 5), wherein read
> requests that we're not aligned to the chunk size would return bogus data. This
> too only seems to happen in non-VM connections (e.g. nbd, etc...).
> 
> v2:
> * Remove BufferedFixedReader entirely, use AsyncIndexReader instead
> * Implement AsyncSeek for AsyncIndexReader
> * Fix the second bug in Rust instead of QEMU C
> 
> 
> [0] ...and since the NBD kernel driver appears to be horribly broken, this often
> also crashes most of the system, but that's a different story. If you ever get
> in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
> disconnect the device ('qemu-nbd -d' intelligently issues a read before
> disconnecting, thus hanging before getting anything done...)
> 
> 
> backup: Stefan Reiter (3):
>   add and implement chunk_from_offset for IndexFile
>   implement AsyncSeek for AsyncIndexReader
>   remove BufferedFixedReader interface
> 
>  src/backup/async_index_reader.rs | 116 ++++++++++++++++++----
>  src/backup/dynamic_index.rs      |  18 ++++
>  src/backup/fixed_index.rs        | 165 +++----------------------------
>  src/backup/index.rs              |   4 +
>  4 files changed, 129 insertions(+), 174 deletions(-)
> 
> backup-qemu: Stefan Reiter (2):
>   use AsyncIndexReader for read_image_at
>   read_image_at: iterate until buffer is filled
> 
>  current-api.h  |  4 ++--
>  src/lib.rs     |  4 ++--
>  src/restore.rs | 35 +++++++++++++++++++++++------------
>  3 files changed, 27 insertions(+), 16 deletions(-)
> 

applied series, thanks!

using the noload mount option is really the key here, for ext4 at least ^^
# mount -o noload /dev/nbd0p1 /mnt/foo




^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-07-23  8:31 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
2020-07-22 14:16   ` Thomas Lamprecht
2020-07-22 14:24     ` Stefan Reiter
2020-07-22 14:41       ` Thomas Lamprecht
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal