public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings
@ 2020-07-22 13:56 Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
                   ` (5 more replies)
  0 siblings, 6 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

When using the PBS blockdriver with qemu-nbd (for example), it can happen that
enough read requests are issued to saturate the tokio thread pool. Not an issue
in general, but as me and Wolfgang painstakenly discovered a while back, it does
break block_on, which is used in BufferedFixedReader. This means that reading
larger amounts of data would hang the QEMU process [0].

Fix this by replacing the BufferedFixedReader with an AsyncIndexReader,
implementing AsyncSeek for it in the process. This makes the entire API async,
requiring no block_on anymore.

Incidentally, this also gave me my best benchmark results yet, coming in at
above 1.6 Gb/s read speed via NBD on my local machine.

Additionally I discovered a seperate bug (fixed by patch 5), wherein read
requests that we're not aligned to the chunk size would return bogus data. This
too only seems to happen in non-VM connections (e.g. nbd, etc...).

v2:
* Remove BufferedFixedReader entirely, use AsyncIndexReader instead
* Implement AsyncSeek for AsyncIndexReader
* Fix the second bug in Rust instead of QEMU C


[0] ...and since the NBD kernel driver appears to be horribly broken, this often
also crashes most of the system, but that's a different story. If you ever get
in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
disconnect the device ('qemu-nbd -d' intelligently issues a read before
disconnecting, thus hanging before getting anything done...)


backup: Stefan Reiter (3):
  add and implement chunk_from_offset for IndexFile
  implement AsyncSeek for AsyncIndexReader
  remove BufferedFixedReader interface

 src/backup/async_index_reader.rs | 116 ++++++++++++++++++----
 src/backup/dynamic_index.rs      |  18 ++++
 src/backup/fixed_index.rs        | 165 +++----------------------------
 src/backup/index.rs              |   4 +
 4 files changed, 129 insertions(+), 174 deletions(-)

backup-qemu: Stefan Reiter (2):
  use AsyncIndexReader for read_image_at
  read_image_at: iterate until buffer is filled

 current-api.h  |  4 ++--
 src/lib.rs     |  4 ++--
 src/restore.rs | 35 +++++++++++++++++++++++------------
 3 files changed, 27 insertions(+), 16 deletions(-)

-- 
2.20.1




^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 14:16   ` Thomas Lamprecht
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Necessary for byte-wise seeking through chunks in an index.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/dynamic_index.rs | 18 ++++++++++++++++++
 src/backup/fixed_index.rs   | 11 +++++++++++
 src/backup/index.rs         |  3 +++
 3 files changed, 32 insertions(+)

diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
index 4907fe1f..887b7cf3 100644
--- a/src/backup/dynamic_index.rs
+++ b/src/backup/dynamic_index.rs
@@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
             digest: self.index[pos].digest.clone(),
         })
     }
+
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+        let end_idx = self.index.len() - 1;
+        let end = self.chunk_end(end_idx);
+        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
+        let found_idx = match found_idx {
+            Ok(i) => i,
+            Err(_) => return None
+        };
+
+        let found_start = if found_idx == 0 {
+            0
+        } else {
+            self.chunk_end(found_idx - 1)
+        };
+
+        Some((found_idx, offset - found_start))
+    }
 }
 
 struct CachedChunk {
diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index 73d0dad0..b7e785d6 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
 
         (csum, chunk_end)
     }
+
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
+        if offset >= self.size {
+            return None;
+        }
+
+        Some((
+            (offset / self.chunk_size as u64) as usize,
+            offset % self.chunk_size as u64
+        ))
+    }
 }
 
 pub struct FixedIndexWriter {
diff --git a/src/backup/index.rs b/src/backup/index.rs
index efdf3b54..2eab8524 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -22,6 +22,9 @@ pub trait IndexFile {
     fn index_bytes(&self) -> u64;
     fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
 
+    /// Get the chunk index and the relative offset within it for a byte offset
+    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
+
     /// Compute index checksum and size
     fn compute_csum(&self) -> ([u8; 32], u64);
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Requires updating the AsyncRead implementation to cope with byte-wise
seeks to intra-chunk positions.

Uses chunk_from_offset to get locations within chunks, but tries to
avoid it for sequential read to not reduce performance from before.

AsyncSeek needs to use the temporary seek_to_pos to avoid changing the
position in case an invalid seek is given and it needs to error in
poll_complete.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/async_index_reader.rs | 116 +++++++++++++++++++++++++------
 src/backup/index.rs              |   1 +
 2 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
index 0911375e..98372aa1 100644
--- a/src/backup/async_index_reader.rs
+++ b/src/backup/async_index_reader.rs
@@ -1,30 +1,35 @@
 use std::future::Future;
 use std::task::{Poll, Context};
 use std::pin::Pin;
+use std::io::SeekFrom;
 
 use anyhow::Error;
 use futures::future::FutureExt;
 use futures::ready;
-use tokio::io::AsyncRead;
+use tokio::io::{AsyncRead, AsyncSeek};
 
 use proxmox::sys::error::io_err_other;
 use proxmox::io_format_err;
 
 use super::IndexFile;
 use super::read_chunk::AsyncReadChunk;
+use super::index::ChunkReadInfo;
 
 enum AsyncIndexReaderState<S> {
     NoData,
     WaitForData(Pin<Box<dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static>>),
-    HaveData(usize),
+    HaveData,
 }
 
 pub struct AsyncIndexReader<S, I: IndexFile> {
     store: Option<S>,
     index: I,
     read_buffer: Vec<u8>,
+    current_chunk_offset: u64,
     current_chunk_idx: usize,
-    current_chunk_digest: [u8; 32],
+    current_chunk_info: Option<ChunkReadInfo>,
+    position: u64,
+    seek_to_pos: i64,
     state: AsyncIndexReaderState<S>,
 }
 
@@ -37,8 +42,11 @@ impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
             store: Some(store),
             index,
             read_buffer: Vec::with_capacity(1024 * 1024),
+            current_chunk_offset: 0,
             current_chunk_idx: 0,
-            current_chunk_digest: [0u8; 32],
+            current_chunk_info: None,
+            position: 0,
+            seek_to_pos: 0,
             state: AsyncIndexReaderState::NoData,
         }
     }
@@ -58,23 +66,41 @@ where
         loop {
             match &mut this.state {
                 AsyncIndexReaderState::NoData => {
-                    if this.current_chunk_idx >= this.index.index_count() {
+                    let (idx, offset) = if this.current_chunk_info.is_some() &&
+                        this.position == this.current_chunk_info.as_ref().unwrap().range.end
+                    {
+                        // optimization for sequential chunk read
+                        let next_idx = this.current_chunk_idx + 1;
+                        (next_idx, 0)
+                    } else {
+                        match this.index.chunk_from_offset(this.position) {
+                            Some(res) => res,
+                            None => return Poll::Ready(Ok(0))
+                        }
+                    };
+
+                    if idx >= this.index.index_count() {
                         return Poll::Ready(Ok(0));
                     }
 
-                    let digest = this
+                    let info = this
                         .index
-                        .index_digest(this.current_chunk_idx)
-                        .ok_or(io_format_err!("could not get digest"))?
-                        .clone();
+                        .chunk_info(idx)
+                        .ok_or(io_format_err!("could not get digest"))?;
 
-                    if digest == this.current_chunk_digest {
-                        this.state = AsyncIndexReaderState::HaveData(0);
-                        continue;
+                    this.current_chunk_offset = offset;
+                    this.current_chunk_idx = idx;
+                    let old_info = this.current_chunk_info.replace(info.clone());
+
+                    if let Some(old_info) = old_info {
+                        if old_info.digest == info.digest {
+                            // hit, chunk is currently in cache
+                            this.state = AsyncIndexReaderState::HaveData;
+                            continue;
+                        }
                     }
 
-                    this.current_chunk_digest = digest;
-
+                    // miss, need to download new chunk
                     let store = match this.store.take() {
                         Some(store) => store,
                         None => {
@@ -83,7 +109,7 @@ where
                     };
 
                     let future = async move {
-                        store.read_chunk(&digest)
+                        store.read_chunk(&info.digest)
                             .await
                             .map(move |x| (store, x))
                     };
@@ -95,7 +121,7 @@ where
                         Ok((store, mut chunk_data)) => {
                             this.read_buffer.clear();
                             this.read_buffer.append(&mut chunk_data);
-                            this.state = AsyncIndexReaderState::HaveData(0);
+                            this.state = AsyncIndexReaderState::HaveData;
                             this.store = Some(store);
                         }
                         Err(err) => {
@@ -103,8 +129,8 @@ where
                         }
                     };
                 }
-                AsyncIndexReaderState::HaveData(offset) => {
-                    let offset = *offset;
+                AsyncIndexReaderState::HaveData => {
+                    let offset = this.current_chunk_offset as usize;
                     let len = this.read_buffer.len();
                     let n = if len - offset < buf.len() {
                         len - offset
@@ -113,11 +139,13 @@ where
                     };
 
                     buf[0..n].copy_from_slice(&this.read_buffer[offset..(offset + n)]);
+                    this.position += n as u64;
+
                     if offset + n == len {
                         this.state = AsyncIndexReaderState::NoData;
-                        this.current_chunk_idx += 1;
                     } else {
-                        this.state = AsyncIndexReaderState::HaveData(offset + n);
+                        this.current_chunk_offset += n as u64;
+                        this.state = AsyncIndexReaderState::HaveData;
                     }
 
                     return Poll::Ready(Ok(n));
@@ -126,3 +154,51 @@ where
         }
     }
 }
+
+impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
+where
+    S: AsyncReadChunk + Unpin + Sync + 'static,
+    I: IndexFile + Unpin,
+{
+    fn start_seek(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<tokio::io::Result<()>> {
+        let this = Pin::get_mut(self);
+        this.seek_to_pos = match pos {
+            SeekFrom::Start(offset) => {
+                offset as i64
+            },
+            SeekFrom::End(offset) => {
+                this.index.index_bytes() as i64 + offset
+            },
+            SeekFrom::Current(offset) => {
+                this.position as i64 + offset
+            }
+        };
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<tokio::io::Result<u64>> {
+        let this = Pin::get_mut(self);
+
+        let index_bytes = this.index.index_bytes();
+        if this.seek_to_pos < 0 {
+            return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
+        } else if this.seek_to_pos > index_bytes as i64 {
+            this.position = index_bytes;
+        } else {
+            this.position = this.seek_to_pos as u64;
+        }
+
+        // even if seeking within one chunk, we need to go to NoData to
+        // recalculate the current_chunk_offset (data is cached anyway)
+        this.state = AsyncIndexReaderState::NoData;
+
+        Poll::Ready(Ok(this.position))
+    }
+}
diff --git a/src/backup/index.rs b/src/backup/index.rs
index 2eab8524..c6bab56e 100644
--- a/src/backup/index.rs
+++ b/src/backup/index.rs
@@ -1,6 +1,7 @@
 use std::collections::HashMap;
 use std::ops::Range;
 
+#[derive(Clone)]
 pub struct ChunkReadInfo {
     pub range: Range<u64>,
     pub digest: [u8; 32],
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

replaced by AsyncIndexReader

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/backup/fixed_index.rs | 154 --------------------------------------
 1 file changed, 154 deletions(-)

diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
index b7e785d6..59f63d2e 100644
--- a/src/backup/fixed_index.rs
+++ b/src/backup/fixed_index.rs
@@ -13,7 +13,6 @@ use std::os::unix::io::AsRawFd;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 
-use super::read_chunk::*;
 use super::ChunkInfo;
 
 use proxmox::tools::io::ReadExt;
@@ -146,20 +145,6 @@ impl FixedIndexReader {
         Ok(())
     }
 
-    #[inline]
-    fn chunk_end(&self, pos: usize) -> u64 {
-        if pos >= self.index_length {
-            panic!("chunk index out of range");
-        }
-
-        let end = ((pos + 1) * self.chunk_size) as u64;
-        if end > self.size {
-            self.size
-        } else {
-            end
-        }
-    }
-
     pub fn print_info(&self) {
         println!("Size: {}", self.size);
         println!("ChunkSize: {}", self.chunk_size);
@@ -476,142 +461,3 @@ impl FixedIndexWriter {
         Ok(())
     }
 }
-
-pub struct BufferedFixedReader<S> {
-    store: S,
-    index: FixedIndexReader,
-    archive_size: u64,
-    read_buffer: Vec<u8>,
-    buffered_chunk_idx: usize,
-    buffered_chunk_start: u64,
-    read_offset: u64,
-}
-
-impl<S: ReadChunk> BufferedFixedReader<S> {
-    pub fn new(index: FixedIndexReader, store: S) -> Self {
-        let archive_size = index.size;
-        Self {
-            store,
-            index,
-            archive_size,
-            read_buffer: Vec::with_capacity(1024 * 1024),
-            buffered_chunk_idx: 0,
-            buffered_chunk_start: 0,
-            read_offset: 0,
-        }
-    }
-
-    pub fn archive_size(&self) -> u64 {
-        self.archive_size
-    }
-
-    fn buffer_chunk(&mut self, idx: usize) -> Result<(), Error> {
-        let index = &self.index;
-        let info = match index.chunk_info(idx) {
-            Some(info) => info,
-            None => bail!("chunk index out of range"),
-        };
-
-        // fixme: avoid copy
-
-        let data = self.store.read_chunk(&info.digest)?;
-        let size = info.range.end - info.range.start;
-        if size != data.len() as u64 {
-            bail!("read chunk with wrong size ({} != {}", size, data.len());
-        }
-
-        self.read_buffer.clear();
-        self.read_buffer.extend_from_slice(&data);
-
-        self.buffered_chunk_idx = idx;
-
-        self.buffered_chunk_start = info.range.start as u64;
-        Ok(())
-    }
-}
-
-impl<S: ReadChunk> crate::tools::BufferedRead for BufferedFixedReader<S> {
-    fn buffered_read(&mut self, offset: u64) -> Result<&[u8], Error> {
-        if offset == self.archive_size {
-            return Ok(&self.read_buffer[0..0]);
-        }
-
-        let buffer_len = self.read_buffer.len();
-        let index = &self.index;
-
-        // optimization for sequential read
-        if buffer_len > 0
-            && ((self.buffered_chunk_idx + 1) < index.index_length)
-            && (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let next_idx = self.buffered_chunk_idx + 1;
-            let next_end = index.chunk_end(next_idx);
-            if offset < next_end {
-                self.buffer_chunk(next_idx)?;
-                let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-                return Ok(&self.read_buffer[buffer_offset..]);
-            }
-        }
-
-        if (buffer_len == 0)
-            || (offset < self.buffered_chunk_start)
-            || (offset >= (self.buffered_chunk_start + (self.read_buffer.len() as u64)))
-        {
-            let idx = (offset / index.chunk_size as u64) as usize;
-            self.buffer_chunk(idx)?;
-        }
-
-        let buffer_offset = (offset - self.buffered_chunk_start) as usize;
-        Ok(&self.read_buffer[buffer_offset..])
-    }
-}
-
-impl<S: ReadChunk> std::io::Read for BufferedFixedReader<S> {
-    fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
-        use crate::tools::BufferedRead;
-        use std::io::{Error, ErrorKind};
-
-        let data = match self.buffered_read(self.read_offset) {
-            Ok(v) => v,
-            Err(err) => return Err(Error::new(ErrorKind::Other, err.to_string())),
-        };
-
-        let n = if data.len() > buf.len() {
-            buf.len()
-        } else {
-            data.len()
-        };
-
-        unsafe {
-            std::ptr::copy_nonoverlapping(data.as_ptr(), buf.as_mut_ptr(), n);
-        }
-
-        self.read_offset += n as u64;
-
-        Ok(n)
-    }
-}
-
-impl<S: ReadChunk> Seek for BufferedFixedReader<S> {
-    fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
-        let new_offset = match pos {
-            SeekFrom::Start(start_offset) => start_offset as i64,
-            SeekFrom::End(end_offset) => (self.archive_size as i64) + end_offset,
-            SeekFrom::Current(offset) => (self.read_offset as i64) + offset,
-        };
-
-        use std::io::{Error, ErrorKind};
-        if (new_offset < 0) || (new_offset > (self.archive_size as i64)) {
-            return Err(Error::new(
-                ErrorKind::Other,
-                format!(
-                    "seek is out of range {} ([0..{}])",
-                    new_offset, self.archive_size
-                ),
-            ));
-        }
-        self.read_offset = new_offset as u64;
-
-        Ok(self.read_offset)
-    }
-}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (2 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
  2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

Replaces the removed BufferedFixedReader and makes the API fully async
(not block_in_place or block_on which could break with many requests at
once).

A tokio::sync::Mutex needs to be used for exclusive access, since a
regular one would not work with async/await calls.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/restore.rs | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/src/restore.rs b/src/restore.rs
index 3e37066..2be0295 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,13 +1,14 @@
 use std::sync::{Arc, Mutex};
 use std::collections::HashMap;
-use std::io::{Read, Seek, SeekFrom};
+use std::io::SeekFrom;
 use std::convert::TryInto;
 
 use anyhow::{format_err, bail, Error};
 use once_cell::sync::OnceCell;
 use tokio::runtime::Runtime;
+use tokio::prelude::*;
 
-use proxmox_backup::tools::runtime::{get_runtime_with_builder, block_in_place};
+use proxmox_backup::tools::runtime::get_runtime_with_builder;
 use proxmox_backup::backup::*;
 use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, RemoteChunkReader};
 
@@ -16,7 +17,7 @@ use crate::registry::Registry;
 use crate::capi_types::DataPointer;
 
 struct ImageAccessInfo {
-    reader: Arc<Mutex<BufferedFixedReader<RemoteChunkReader>>>,
+    reader: Arc<tokio::sync::Mutex<AsyncIndexReader<RemoteChunkReader, FixedIndexReader>>>,
     _archive_name: String,
     archive_size: u64,
 }
@@ -231,12 +232,12 @@ impl RestoreTask {
         let index = client.download_fixed_index(&manifest, &archive_name).await?;
         let archive_size = index.index_bytes();
 
-        let reader = BufferedFixedReader::new(index, chunk_reader);
+        let reader = AsyncIndexReader::new(index, chunk_reader);
 
         let info = ImageAccessInfo {
             archive_size,
             _archive_name: archive_name, /// useful to debug
-            reader: Arc::new(Mutex::new(reader)),
+            reader: Arc::new(tokio::sync::Mutex::new(reader)),
         };
 
         (*self.image_registry.lock().unwrap()).register(info)
@@ -260,12 +261,10 @@ impl RestoreTask {
             bail!("read index {} out of bounds {}", offset, image_size);
         }
 
-        let bytes = block_in_place(|| {
-            let mut reader = reader.lock().unwrap();
-            reader.seek(SeekFrom::Start(offset))?;
-            let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
-            reader.read(buf)
-        })?;
+        let mut reader = reader.lock().await;
+        reader.seek(SeekFrom::Start(offset)).await?;
+        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+        let bytes = reader.read(buf).await?;
 
         Ok(bytes.try_into()?)
     }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (3 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
@ 2020-07-22 13:56 ` Stefan Reiter
  2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 13:56 UTC (permalink / raw)
  To: pbs-devel

QEMU will always assume EOF when less bytes than requested are returned
by a block drivers 'read' interface, so we need to fill the buffer up to
'size' if possible.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

v2 note: this was previously a QEMU patch, but honestly that's stupid so let's
do it in Rust instead.

 current-api.h  |  4 ++--
 src/lib.rs     |  4 ++--
 src/restore.rs | 20 ++++++++++++++++----
 3 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/current-api.h b/current-api.h
index 15bb275..d77eff6 100644
--- a/current-api.h
+++ b/current-api.h
@@ -364,8 +364,8 @@ int proxmox_restore_read_image_at(ProxmoxRestoreHandle *handle,
  * Note: The data pointer needs to be valid until the async
  * opteration is finished.
  *
- * Note: It is not an error for a successful call to transfer fewer
- * bytes than requested.
+ * Note: The call will only ever transfer less than 'size' bytes if
+ * the end of the file has been reached.
  */
 void proxmox_restore_read_image_at_async(ProxmoxRestoreHandle *handle,
                                          uint8_t aid,
diff --git a/src/lib.rs b/src/lib.rs
index d4b9370..3346be8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -934,8 +934,8 @@ pub extern "C" fn proxmox_restore_read_image_at(
 /// Note: The data pointer needs to be valid until the async
 /// opteration is finished.
 ///
-/// Note: It is not an error for a successful call to transfer fewer
-/// bytes than requested.
+/// Note: The call will only ever transfer less than 'size' bytes if
+/// the end of the file has been reached.
 #[no_mangle]
 #[allow(clippy::not_unsafe_ptr_arg_deref)]
 pub extern "C" fn proxmox_restore_read_image_at_async(
diff --git a/src/restore.rs b/src/restore.rs
index 2be0295..e43d040 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -262,10 +262,22 @@ impl RestoreTask {
         }
 
         let mut reader = reader.lock().await;
-        reader.seek(SeekFrom::Start(offset)).await?;
-        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
-        let bytes = reader.read(buf).await?;
 
-        Ok(bytes.try_into()?)
+        let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
+        let mut read = 0;
+
+        while read < size {
+            reader.seek(SeekFrom::Start(offset + read)).await?;
+            let bytes = reader.read(&mut buf[read as usize..]).await?;
+
+            if bytes == 0 {
+                // EOF
+                break;
+            }
+
+            read += bytes as u64;
+        }
+
+        Ok(read.try_into()?)
     }
 }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
@ 2020-07-22 14:16   ` Thomas Lamprecht
  2020-07-22 14:24     ` Stefan Reiter
  0 siblings, 1 reply; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:16 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 15:56, Stefan Reiter wrote:
> Necessary for byte-wise seeking through chunks in an index.
> 
> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
> ---
>  src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>  src/backup/fixed_index.rs   | 11 +++++++++++
>  src/backup/index.rs         |  3 +++
>  3 files changed, 32 insertions(+)
> 
> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
> index 4907fe1f..887b7cf3 100644
> --- a/src/backup/dynamic_index.rs
> +++ b/src/backup/dynamic_index.rs
> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>              digest: self.index[pos].digest.clone(),
>          })
>      }
> +
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> +        let end_idx = self.index.len() - 1;
> +        let end = self.chunk_end(end_idx);
> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
> +        let found_idx = match found_idx {
> +            Ok(i) => i,
> +            Err(_) => return None
> +        };
> +
> +        let found_start = if found_idx == 0 {
> +            0
> +        } else {
> +            self.chunk_end(found_idx - 1)
> +        };
> +
> +        Some((found_idx, offset - found_start))
> +    }
>  }
>  
>  struct CachedChunk {
> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
> index 73d0dad0..b7e785d6 100644
> --- a/src/backup/fixed_index.rs
> +++ b/src/backup/fixed_index.rs
> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>  
>          (csum, chunk_end)
>      }
> +
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
> +        if offset >= self.size {
> +            return None;
> +        }
> +
> +        Some((
> +            (offset / self.chunk_size as u64) as usize,
> +            offset % self.chunk_size as u64

modulo is really slow, but isn't chunk_size always a 2^x and thus we can
do the same here as we do in other places:

offset & (self.chunk_size - 1)

> +        ))
> +    }
>  }
>  
>  pub struct FixedIndexWriter {
> diff --git a/src/backup/index.rs b/src/backup/index.rs
> index efdf3b54..2eab8524 100644
> --- a/src/backup/index.rs
> +++ b/src/backup/index.rs
> @@ -22,6 +22,9 @@ pub trait IndexFile {
>      fn index_bytes(&self) -> u64;
>      fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>  
> +    /// Get the chunk index and the relative offset within it for a byte offset
> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
> +
>      /// Compute index checksum and size
>      fn compute_csum(&self) -> ([u8; 32], u64);
>  
> 





^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 14:16   ` Thomas Lamprecht
@ 2020-07-22 14:24     ` Stefan Reiter
  2020-07-22 14:41       ` Thomas Lamprecht
  0 siblings, 1 reply; 10+ messages in thread
From: Stefan Reiter @ 2020-07-22 14:24 UTC (permalink / raw)
  To: Thomas Lamprecht, Proxmox Backup Server development discussion

On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
> On 22.07.20 15:56, Stefan Reiter wrote:
>> Necessary for byte-wise seeking through chunks in an index.
>>
>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>> ---
>>   src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>>   src/backup/fixed_index.rs   | 11 +++++++++++
>>   src/backup/index.rs         |  3 +++
>>   3 files changed, 32 insertions(+)
>>
>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>> index 4907fe1f..887b7cf3 100644
>> --- a/src/backup/dynamic_index.rs
>> +++ b/src/backup/dynamic_index.rs
>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>>               digest: self.index[pos].digest.clone(),
>>           })
>>       }
>> +
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> +        let end_idx = self.index.len() - 1;
>> +        let end = self.chunk_end(end_idx);
>> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>> +        let found_idx = match found_idx {
>> +            Ok(i) => i,
>> +            Err(_) => return None
>> +        };
>> +
>> +        let found_start = if found_idx == 0 {
>> +            0
>> +        } else {
>> +            self.chunk_end(found_idx - 1)
>> +        };
>> +
>> +        Some((found_idx, offset - found_start))
>> +    }
>>   }
>>   
>>   struct CachedChunk {
>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>> index 73d0dad0..b7e785d6 100644
>> --- a/src/backup/fixed_index.rs
>> +++ b/src/backup/fixed_index.rs
>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>   
>>           (csum, chunk_end)
>>       }
>> +
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>> +        if offset >= self.size {
>> +            return None;
>> +        }
>> +
>> +        Some((
>> +            (offset / self.chunk_size as u64) as usize,
>> +            offset % self.chunk_size as u64
> 
> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
> do the same here as we do in other places:
> 
> offset & (self.chunk_size - 1)
> 

I found it more readable this way and I don't think it's hot-path enough 
to make a real difference in performance.

But I don't mind, could even replace the div as well. Maybe an 
assert!(chunk_size.is_power_of_two()) might be good somewhere though.

>> +        ))
>> +    }
>>   }
>>   
>>   pub struct FixedIndexWriter {
>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>> index efdf3b54..2eab8524 100644
>> --- a/src/backup/index.rs
>> +++ b/src/backup/index.rs
>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>>       fn index_bytes(&self) -> u64;
>>       fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>   
>> +    /// Get the chunk index and the relative offset within it for a byte offset
>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>> +
>>       /// Compute index checksum and size
>>       fn compute_csum(&self) -> ([u8; 32], u64);
>>   
>>
> 




^ permalink raw reply	[flat|nested] 10+ messages in thread

* Re: [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile
  2020-07-22 14:24     ` Stefan Reiter
@ 2020-07-22 14:41       ` Thomas Lamprecht
  0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-22 14:41 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 16:24, Stefan Reiter wrote:
> On 7/22/20 4:16 PM, Thomas Lamprecht wrote:
>> On 22.07.20 15:56, Stefan Reiter wrote:
>>> Necessary for byte-wise seeking through chunks in an index.
>>>
>>> Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
>>> ---
>>>   src/backup/dynamic_index.rs | 18 ++++++++++++++++++
>>>   src/backup/fixed_index.rs   | 11 +++++++++++
>>>   src/backup/index.rs         |  3 +++
>>>   3 files changed, 32 insertions(+)
>>>
>>> diff --git a/src/backup/dynamic_index.rs b/src/backup/dynamic_index.rs
>>> index 4907fe1f..887b7cf3 100644
>>> --- a/src/backup/dynamic_index.rs
>>> +++ b/src/backup/dynamic_index.rs
>>> @@ -216,6 +216,24 @@ impl IndexFile for DynamicIndexReader {
>>>               digest: self.index[pos].digest.clone(),
>>>           })
>>>       }
>>> +
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> +        let end_idx = self.index.len() - 1;
>>> +        let end = self.chunk_end(end_idx);
>>> +        let found_idx = self.binary_search(0, 0, end_idx, end, offset);
>>> +        let found_idx = match found_idx {
>>> +            Ok(i) => i,
>>> +            Err(_) => return None
>>> +        };
>>> +
>>> +        let found_start = if found_idx == 0 {
>>> +            0
>>> +        } else {
>>> +            self.chunk_end(found_idx - 1)
>>> +        };
>>> +
>>> +        Some((found_idx, offset - found_start))
>>> +    }
>>>   }
>>>     struct CachedChunk {
>>> diff --git a/src/backup/fixed_index.rs b/src/backup/fixed_index.rs
>>> index 73d0dad0..b7e785d6 100644
>>> --- a/src/backup/fixed_index.rs
>>> +++ b/src/backup/fixed_index.rs
>>> @@ -219,6 +219,17 @@ impl IndexFile for FixedIndexReader {
>>>             (csum, chunk_end)
>>>       }
>>> +
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)> {
>>> +        if offset >= self.size {
>>> +            return None;
>>> +        }
>>> +
>>> +        Some((
>>> +            (offset / self.chunk_size as u64) as usize,
>>> +            offset % self.chunk_size as u64
>>
>> modulo is really slow, but isn't chunk_size always a 2^x and thus we can
>> do the same here as we do in other places:
>>
>> offset & (self.chunk_size - 1)
>>
> 
> I found it more readable this way and I don't think it's hot-path enough to make a real difference in performance.

then why not add a comment instead? This is some orders of magnitude slower,
not just 2 or 3% ...

> 
> But I don't mind, could even replace the div as well. Maybe an assert!(chunk_size.is_power_of_two()) might be good somewhere though.

Yeah, somewhere in an initial entry point to such code an assert wouldn't hurt.

> 
>>> +        ))
>>> +    }
>>>   }
>>>     pub struct FixedIndexWriter {
>>> diff --git a/src/backup/index.rs b/src/backup/index.rs
>>> index efdf3b54..2eab8524 100644
>>> --- a/src/backup/index.rs
>>> +++ b/src/backup/index.rs
>>> @@ -22,6 +22,9 @@ pub trait IndexFile {
>>>       fn index_bytes(&self) -> u64;
>>>       fn chunk_info(&self, pos: usize) -> Option<ChunkReadInfo>;
>>>   +    /// Get the chunk index and the relative offset within it for a byte offset
>>> +    fn chunk_from_offset(&self, offset: u64) -> Option<(usize, u64)>;
>>> +
>>>       /// Compute index checksum and size
>>>       fn compute_csum(&self) -> ([u8; 32], u64);
>>>  
>>






^ permalink raw reply	[flat|nested] 10+ messages in thread

* [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings
  2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
                   ` (4 preceding siblings ...)
  2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
@ 2020-07-23  8:31 ` Thomas Lamprecht
  5 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2020-07-23  8:31 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Stefan Reiter

On 22.07.20 15:56, Stefan Reiter wrote:
> When using the PBS blockdriver with qemu-nbd (for example), it can happen that
> enough read requests are issued to saturate the tokio thread pool. Not an issue
> in general, but as me and Wolfgang painstakenly discovered a while back, it does
> break block_on, which is used in BufferedFixedReader. This means that reading
> larger amounts of data would hang the QEMU process [0].
> 
> Fix this by replacing the BufferedFixedReader with an AsyncIndexReader,
> implementing AsyncSeek for it in the process. This makes the entire API async,
> requiring no block_on anymore.
> 
> Incidentally, this also gave me my best benchmark results yet, coming in at
> above 1.6 Gb/s read speed via NBD on my local machine.
> 
> Additionally I discovered a seperate bug (fixed by patch 5), wherein read
> requests that we're not aligned to the chunk size would return bogus data. This
> too only seems to happen in non-VM connections (e.g. nbd, etc...).
> 
> v2:
> * Remove BufferedFixedReader entirely, use AsyncIndexReader instead
> * Implement AsyncSeek for AsyncIndexReader
> * Fix the second bug in Rust instead of QEMU C
> 
> 
> [0] ...and since the NBD kernel driver appears to be horribly broken, this often
> also crashes most of the system, but that's a different story. If you ever get
> in this situation, 'nbd-client -d /dev/nbdX' works (sometimes) to force
> disconnect the device ('qemu-nbd -d' intelligently issues a read before
> disconnecting, thus hanging before getting anything done...)
> 
> 
> backup: Stefan Reiter (3):
>   add and implement chunk_from_offset for IndexFile
>   implement AsyncSeek for AsyncIndexReader
>   remove BufferedFixedReader interface
> 
>  src/backup/async_index_reader.rs | 116 ++++++++++++++++++----
>  src/backup/dynamic_index.rs      |  18 ++++
>  src/backup/fixed_index.rs        | 165 +++----------------------------
>  src/backup/index.rs              |   4 +
>  4 files changed, 129 insertions(+), 174 deletions(-)
> 
> backup-qemu: Stefan Reiter (2):
>   use AsyncIndexReader for read_image_at
>   read_image_at: iterate until buffer is filled
> 
>  current-api.h  |  4 ++--
>  src/lib.rs     |  4 ++--
>  src/restore.rs | 35 +++++++++++++++++++++++------------
>  3 files changed, 27 insertions(+), 16 deletions(-)
> 

applied series, thanks!

using the noload mount option is really the key here, for ext4 at least ^^
# mount -o noload /dev/nbd0p1 /mnt/foo




^ permalink raw reply	[flat|nested] 10+ messages in thread

end of thread, other threads:[~2020-07-23  8:31 UTC | newest]

Thread overview: 10+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-22 13:56 [pbs-devel] [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 1/5] add and implement chunk_from_offset for IndexFile Stefan Reiter
2020-07-22 14:16   ` Thomas Lamprecht
2020-07-22 14:24     ` Stefan Reiter
2020-07-22 14:41       ` Thomas Lamprecht
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 2/5] implement AsyncSeek for AsyncIndexReader Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup 3/5] remove BufferedFixedReader interface Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 4/5] use AsyncIndexReader for read_image_at Stefan Reiter
2020-07-22 13:56 ` [pbs-devel] [PATCH v2 backup-qemu 5/5] read_image_at: iterate until buffer is filled Stefan Reiter
2020-07-23  8:31 ` [pbs-devel] applied-series: [PATCH v2 0/5] Fix PBS blockdriver for non-VM settings Thomas Lamprecht

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal