* [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
` (8 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Explicitly test that data will stay available and can be retrieved
immediately via listen(), even if the future producing the data and
notifying the consumers was already run in the past.
Wasn't broken or anything, but helps with understanding IMO.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
Wasn't broken or anything, but helps with understanding IMO.
src/tools/broadcast_future.rs | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/src/tools/broadcast_future.rs b/src/tools/broadcast_future.rs
index 88b7aaab..7bfd83b7 100644
--- a/src/tools/broadcast_future.rs
+++ b/src/tools/broadcast_future.rs
@@ -166,4 +166,15 @@ fn test_broadcast_future() {
let result = CHECKSUM.load(Ordering::SeqCst);
assert_eq!(result, 3);
+
+ // the result stays available until the BroadcastFuture is dropped
+ rt.block_on(sender.listen()
+ .map_ok(|res| {
+ CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
+ })
+ .map_err(|err| { panic!("got error {}", err); })
+ .map(|_| ()));
+
+ let result = CHECKSUM.load(Ordering::SeqCst);
+ assert_eq!(result, 7);
}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
` (7 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Supports concurrent 'access' calls to the same key via a
BroadcastFuture. These are stored in a seperate HashMap, the LruCache
underneath is only modified once a valid value has been retrieved.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools.rs | 1 +
src/tools/async_lru_cache.rs | 135 +++++++++++++++++++++++++++++++++++
2 files changed, 136 insertions(+)
create mode 100644 src/tools/async_lru_cache.rs
diff --git a/src/tools.rs b/src/tools.rs
index 65049b1e..59599339 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -42,6 +42,7 @@ pub mod json;
pub mod logrotate;
pub mod loopdev;
pub mod lru_cache;
+pub mod async_lru_cache;
pub mod nom;
pub mod runtime;
pub mod serde_filter;
diff --git a/src/tools/async_lru_cache.rs b/src/tools/async_lru_cache.rs
new file mode 100644
index 00000000..cc385ec9
--- /dev/null
+++ b/src/tools/async_lru_cache.rs
@@ -0,0 +1,135 @@
+//! An 'async'-safe layer on the existing sync LruCache implementation. Supports multiple
+//! concurrent requests to the same key.
+
+use anyhow::Error;
+
+use std::collections::HashMap;
+use std::future::Future;
+use std::sync::{Arc, Mutex};
+
+use super::lru_cache::LruCache;
+use super::BroadcastFuture;
+
+/// Interface for asynchronously getting values on cache misses.
+pub trait AsyncCacher<K, V: Clone>: Sync + Send {
+ /// Fetch a value for key on cache miss.
+ ///
+ /// Works similar to non-async lru_cache::Cacher, but if the key has already been requested
+ /// and the result is not cached yet, the 'fetch' function will not be called and instead the
+ /// result of the original request cloned and returned upon completion.
+ ///
+ /// The underlying LRU structure is not modified until the returned future resolves to an
+ /// Ok(Some(_)) value.
+ fn fetch(&self, key: K) -> Box<dyn Future<Output = Result<Option<V>, Error>> + Send>;
+}
+
+/// See tools::lru_cache::LruCache, this implements an async-safe variant of that with the help of
+/// AsyncCacher.
+#[derive(Clone)]
+pub struct AsyncLruCache<K, V> {
+ maps: Arc<Mutex<(LruCache<K, V>, HashMap<K, BroadcastFuture<Option<V>>>)>>,
+}
+
+impl<K: std::cmp::Eq + std::hash::Hash + Copy, V: Clone + Send + 'static> AsyncLruCache<K, V> {
+ /// Create a new AsyncLruCache with the given maximum capacity.
+ pub fn new(capacity: usize) -> Self {
+ Self {
+ maps: Arc::new(Mutex::new((LruCache::new(capacity), HashMap::new()))),
+ }
+ }
+
+ /// Access an item either via the cache or by calling cacher.fetch. A return value of Ok(None)
+ /// means the item requested has no representation, Err(_) means a call to fetch() failed,
+ /// regardless of whether it was initiated by this call or a previous one.
+ pub async fn access(&self, key: K, cacher: &dyn AsyncCacher<K, V>) -> Result<Option<V>, Error> {
+ let (owner, result_fut) = {
+ // check if already requested
+ let mut maps = self.maps.lock().unwrap();
+ if let Some(fut) = maps.1.get(&key) {
+ // wait for the already scheduled future to resolve
+ (false, fut.listen())
+ } else {
+ // check if value is cached in LRU
+ if let Some(val) = maps.0.get_mut(key) {
+ return Ok(Some(val.clone()));
+ }
+
+ // if neither, start broadcast future and put into map while we still have lock
+ let fut = cacher.fetch(key);
+ let broadcast = BroadcastFuture::new(fut);
+ let result_fut = broadcast.listen();
+ maps.1.insert(key, broadcast);
+ (true, result_fut)
+ }
+ // drop Mutex before awaiting any future
+ };
+
+ let result = result_fut.await;
+ match result {
+ Ok(Some(ref value)) if owner => {
+ // this call was the one initiating the request, put into LRU and remove from map
+ let mut maps = self.maps.lock().unwrap();
+ maps.0.insert(key, value.clone());
+ maps.1.remove(&key);
+ }
+ _ => {}
+ }
+ result
+ }
+}
+
+mod test {
+ use super::*;
+
+ struct TestAsyncCacher {
+ prefix: &'static str,
+ }
+
+ impl AsyncCacher<i32, String> for TestAsyncCacher {
+ fn fetch(
+ &self,
+ key: i32,
+ ) -> Box<dyn Future<Output = Result<Option<String>, Error>> + Send> {
+ let x = self.prefix;
+ Box::new(async move { Ok(Some(format!("{}{}", x, key))) })
+ }
+ }
+
+ #[test]
+ fn test_async_lru_cache() {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async move {
+ let cacher = TestAsyncCacher { prefix: "x" };
+ let cache: AsyncLruCache<i32, String> = AsyncLruCache::new(2);
+
+ assert_eq!(
+ cache.access(10, &cacher).await.unwrap(),
+ Some("x10".to_string())
+ );
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ assert_eq!(
+ cache.access(30, &cacher).await.unwrap(),
+ Some("x30".to_string())
+ );
+
+ for _ in 0..10 {
+ let c = cache.clone();
+ tokio::spawn(async move {
+ let cacher = TestAsyncCacher { prefix: "y" };
+ assert_eq!(
+ c.access(40, &cacher).await.unwrap(),
+ Some("y40".to_string())
+ );
+ });
+ }
+
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ });
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
` (6 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Provides a fast arbitrary read implementation with full async and
concurrency support.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
This is technically all that's needed for proxmox-backup-qemu to build and
function as intended, but I decided to also use this IMHO cleaner implementation
to replace the AsyncIndexReader with the following patches.
v2:
* drop ChunkCache type alias, not necessary and looked weird, since it couldn't
be constructed directly
* add comment to other unwrap in read_at
src/backup.rs | 3 ++
src/backup/cached_chunk_reader.rs | 89 +++++++++++++++++++++++++++++++
2 files changed, 92 insertions(+)
create mode 100644 src/backup/cached_chunk_reader.rs
diff --git a/src/backup.rs b/src/backup.rs
index ae937be0..5e1147b4 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -259,3 +259,6 @@ pub use catalog_shell::*;
mod async_index_reader;
pub use async_index_reader::*;
+
+mod cached_chunk_reader;
+pub use cached_chunk_reader::*;
diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
new file mode 100644
index 00000000..ff476e37
--- /dev/null
+++ b/src/backup/cached_chunk_reader.rs
@@ -0,0 +1,89 @@
+//! An async and concurrency safe data reader backed by a local LRU cache.
+
+use anyhow::Error;
+
+use std::future::Future;
+use std::sync::Arc;
+
+use crate::backup::{AsyncReadChunk, IndexFile};
+use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+
+struct AsyncChunkCacher<T> {
+ reader: Arc<T>,
+}
+
+impl<T: AsyncReadChunk + Send + Sync + 'static> AsyncCacher<[u8; 32], Arc<Vec<u8>>>
+ for AsyncChunkCacher<T>
+{
+ fn fetch(
+ &self,
+ key: [u8; 32],
+ ) -> Box<dyn Future<Output = Result<Option<Arc<Vec<u8>>>, Error>> + Send> {
+ let reader = Arc::clone(&self.reader);
+ Box::new(async move {
+ AsyncReadChunk::read_chunk(reader.as_ref(), &key)
+ .await
+ .map(|x| Some(Arc::new(x)))
+ })
+ }
+}
+
+/// Allows arbitrary data reads from an Index via an AsyncReadChunk implementation, using an LRU
+/// cache internally to cache chunks and provide support for multiple concurrent reads (potentially
+/// to the same chunk).
+pub struct CachedChunkReader<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> {
+ cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+ cacher: AsyncChunkCacher<R>,
+ index: I,
+}
+
+impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<I, R> {
+ /// Create a new reader with a local LRU cache containing 'capacity' chunks.
+ pub fn new(reader: R, index: I, capacity: usize) -> Self {
+ let cache = Arc::new(AsyncLruCache::new(capacity));
+ Self::new_with_cache(reader, index, cache)
+ }
+
+ /// Create a new reader with a custom LRU cache. Use this to share a cache between multiple
+ /// readers.
+ pub fn new_with_cache(
+ reader: R,
+ index: I,
+ cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+ ) -> Self {
+ Self {
+ cache,
+ cacher: AsyncChunkCacher {
+ reader: Arc::new(reader),
+ },
+ index,
+ }
+ }
+
+ /// Read data at a given byte offset into a variable size buffer. Returns the amount of bytes
+ /// read, which will always be the size of the buffer except when reaching EOF.
+ pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
+ let size = buf.len();
+ let mut read: usize = 0;
+ while read < size {
+ let cur_offset = offset + read as u64;
+ if let Some(chunk) = self.index.chunk_from_offset(cur_offset) {
+ // chunk indices retrieved from chunk_from_offset always resolve to Some(_)
+ let info = self.index.chunk_info(chunk.0).unwrap();
+
+ // will never be None, see AsyncChunkCacher
+ let data = self.cache.access(info.digest, &self.cacher).await?.unwrap();
+
+ let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read);
+ let slice = &mut buf[read..(read + want_bytes)];
+ let intra_chunk = chunk.1 as usize;
+ slice.copy_from_slice(&data[intra_chunk..(intra_chunk + want_bytes)]);
+ read += want_bytes;
+ } else {
+ // EOF
+ break;
+ }
+ }
+ Ok(read)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (2 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
` (5 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Implemented as a seperate struct SeekableCachedChunkReader that contains
the original as an Arc, since the read_at future captures the
CachedChunkReader, which would otherwise not work with the lifetimes
required by AsyncRead. This is also the reason we cannot use a shared
read buffer and have to allocate a new one for every read. It also means
that the struct items required for AsyncRead/Seek do not need to be
included in a regular CachedChunkReader.
This is intended as a replacement for AsyncIndexReader, so we have less
code duplication and can utilize the LRU cache there too (even though
actual request concurrency is not supported in these traits).
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2:
* drop 'seek_to_pos' from struct and implement error handling directly in
start_seek
* simplify future handling in poll_read with Option::get_or_insert_with
src/backup/cached_chunk_reader.rs | 104 +++++++++++++++++++++++++++++-
1 file changed, 102 insertions(+), 2 deletions(-)
diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
index ff476e37..c9ca4773 100644
--- a/src/backup/cached_chunk_reader.rs
+++ b/src/backup/cached_chunk_reader.rs
@@ -1,12 +1,19 @@
//! An async and concurrency safe data reader backed by a local LRU cache.
use anyhow::Error;
+use futures::future::Future;
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-use std::future::Future;
+use std::io::SeekFrom;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
-use crate::backup::{AsyncReadChunk, IndexFile};
+use super::{AsyncReadChunk, IndexFile};
use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+use proxmox::io_format_err;
+use proxmox::sys::error::io_err_other;
struct AsyncChunkCacher<T> {
reader: Arc<T>,
@@ -87,3 +94,96 @@ impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<
Ok(read)
}
}
+
+impl<I: IndexFile + Send + Sync + 'static, R: AsyncReadChunk + Send + Sync + 'static>
+ CachedChunkReader<I, R>
+{
+ /// Returns a SeekableCachedChunkReader based on this instance, which implements AsyncSeek and
+ /// AsyncRead for use in interfaces which require that. Direct use of read_at is preferred
+ /// otherwise.
+ pub fn seekable(self) -> SeekableCachedChunkReader<I, R> {
+ SeekableCachedChunkReader {
+ index_bytes: self.index.index_bytes(),
+ reader: Arc::new(self),
+ position: 0,
+ read_future: None,
+ }
+ }
+}
+
+pub struct SeekableCachedChunkReader<
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+> {
+ reader: Arc<CachedChunkReader<I, R>>,
+ index_bytes: u64,
+ position: u64,
+ read_future: Option<Pin<Box<dyn Future<Output = Result<(Vec<u8>, usize), Error>> + Send>>>,
+}
+
+impl<I, R> AsyncSeek for SeekableCachedChunkReader<I, R>
+where
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+{
+ fn start_seek(self: Pin<&mut Self>, pos: SeekFrom) -> tokio::io::Result<()> {
+ let this = Pin::get_mut(self);
+ let seek_to_pos = match pos {
+ SeekFrom::Start(offset) => offset as i64,
+ SeekFrom::End(offset) => this.index_bytes as i64 + offset,
+ SeekFrom::Current(offset) => this.position as i64 + offset,
+ };
+ if seek_to_pos < 0 {
+ return Err(io_format_err!("cannot seek to negative values"));
+ } else if seek_to_pos > this.index_bytes as i64 {
+ this.position = this.index_bytes;
+ } else {
+ this.position = seek_to_pos as u64;
+ }
+ Ok(())
+ }
+
+ fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<u64>> {
+ Poll::Ready(Ok(self.position))
+ }
+}
+
+impl<I, R> AsyncRead for SeekableCachedChunkReader<I, R>
+where
+ I: IndexFile + Send + Sync + 'static,
+ R: AsyncReadChunk + Send + Sync + 'static,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut ReadBuf,
+ ) -> Poll<tokio::io::Result<()>> {
+ let this = Pin::get_mut(self);
+
+ let offset = this.position;
+ let wanted = buf.capacity();
+ let reader = Arc::clone(&this.reader);
+
+ let fut = this.read_future.get_or_insert_with(|| {
+ Box::pin(async move {
+ let mut read_buf = vec![0u8; wanted];
+ let read = reader.read_at(&mut read_buf[..wanted], offset).await?;
+ Ok((read_buf, read))
+ })
+ });
+
+ let ret = match ready!(fut.as_mut().poll(cx)) {
+ Ok((read_buf, read)) => {
+ buf.put_slice(&read_buf[..read]);
+ this.position += read as u64;
+ Ok(())
+ }
+ Err(err) => Err(io_err_other(err)),
+ };
+
+ // future completed, drop
+ this.read_future = None;
+
+ Poll::Ready(ret)
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (3 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
` (4 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
admin/datastore reads linearly only, so no need for cache (capacity of 1
basically means no cache except for the currently active chunk).
mount can do random access too, so cache last 8 chunks for possibly a
mild performance improvement.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/api2/admin/datastore.rs | 4 ++--
src/bin/proxmox_backup_client/mount.rs | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index e0dfeecc..7b7f3102 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -1182,7 +1182,7 @@ pub fn download_file_decoded(
manifest.verify_file(&file_name, &csum, size)?;
let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
- let reader = AsyncIndexReader::new(index, chunk_reader);
+ let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
Body::wrap_stream(AsyncReaderStream::new(reader)
.map_err(move |err| {
eprintln!("error during streaming of '{:?}' - {}", path, err);
@@ -1197,7 +1197,7 @@ pub fn download_file_decoded(
manifest.verify_file(&file_name, &csum, size)?;
let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
- let reader = AsyncIndexReader::new(index, chunk_reader);
+ let reader = CachedChunkReader::new(chunk_reader, index, 1).seekable();
Body::wrap_stream(AsyncReaderStream::with_buffer_size(reader, 4*1024*1024)
.map_err(move |err| {
eprintln!("error during streaming of '{:?}' - {}", path, err);
diff --git a/src/bin/proxmox_backup_client/mount.rs b/src/bin/proxmox_backup_client/mount.rs
index f3498e35..d0f04f89 100644
--- a/src/bin/proxmox_backup_client/mount.rs
+++ b/src/bin/proxmox_backup_client/mount.rs
@@ -25,7 +25,7 @@ use proxmox_backup::backup::{
BackupDir,
BackupGroup,
BufferedDynamicReader,
- AsyncIndexReader,
+ CachedChunkReader,
};
use proxmox_backup::client::*;
@@ -281,7 +281,7 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
let size = index.index_bytes();
let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new());
- let reader = AsyncIndexReader::new(index, chunk_reader);
+ let reader = CachedChunkReader::new(chunk_reader, index, 8).seekable();
let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
let name_escaped = tools::systemd::escape_unit(name, false);
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (4 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
` (3 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
superseded by CachedChunkReader, with less code and more speed
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/backup.rs | 3 -
src/backup/async_index_reader.rs | 215 -------------------------------
2 files changed, 218 deletions(-)
delete mode 100644 src/backup/async_index_reader.rs
diff --git a/src/backup.rs b/src/backup.rs
index 5e1147b4..7bf29a5a 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -257,8 +257,5 @@ pub use verify::*;
mod catalog_shell;
pub use catalog_shell::*;
-mod async_index_reader;
-pub use async_index_reader::*;
-
mod cached_chunk_reader;
pub use cached_chunk_reader::*;
diff --git a/src/backup/async_index_reader.rs b/src/backup/async_index_reader.rs
deleted file mode 100644
index 20a37e7e..00000000
--- a/src/backup/async_index_reader.rs
+++ /dev/null
@@ -1,215 +0,0 @@
-use std::future::Future;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::io::SeekFrom;
-
-use anyhow::Error;
-use futures::future::FutureExt;
-use futures::ready;
-use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
-
-use proxmox::sys::error::io_err_other;
-use proxmox::io_format_err;
-
-use super::IndexFile;
-use super::read_chunk::AsyncReadChunk;
-use super::index::ChunkReadInfo;
-
-type ReadFuture<S> = dyn Future<Output = Result<(S, Vec<u8>), Error>> + Send + 'static;
-
-// FIXME: This enum may not be required?
-// - Put the `WaitForData` case directly into a `read_future: Option<>`
-// - make the read loop as follows:
-// * if read_buffer is not empty:
-// use it
-// * else if read_future is there:
-// poll it
-// if read: move data to read_buffer
-// * else
-// create read future
-#[allow(clippy::enum_variant_names)]
-enum AsyncIndexReaderState<S> {
- NoData,
- WaitForData(Pin<Box<ReadFuture<S>>>),
- HaveData,
-}
-
-pub struct AsyncIndexReader<S, I: IndexFile> {
- store: Option<S>,
- index: I,
- read_buffer: Vec<u8>,
- current_chunk_offset: u64,
- current_chunk_idx: usize,
- current_chunk_info: Option<ChunkReadInfo>,
- position: u64,
- seek_to_pos: i64,
- state: AsyncIndexReaderState<S>,
-}
-
-// ok because the only public interfaces operates on &mut Self
-unsafe impl<S: Sync, I: IndexFile + Sync> Sync for AsyncIndexReader<S, I> {}
-
-impl<S: AsyncReadChunk, I: IndexFile> AsyncIndexReader<S, I> {
- pub fn new(index: I, store: S) -> Self {
- Self {
- store: Some(store),
- index,
- read_buffer: Vec::with_capacity(1024 * 1024),
- current_chunk_offset: 0,
- current_chunk_idx: 0,
- current_chunk_info: None,
- position: 0,
- seek_to_pos: 0,
- state: AsyncIndexReaderState::NoData,
- }
- }
-}
-
-impl<S, I> AsyncRead for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut ReadBuf,
- ) -> Poll<tokio::io::Result<()>> {
- let this = Pin::get_mut(self);
- loop {
- match &mut this.state {
- AsyncIndexReaderState::NoData => {
- let (idx, offset) = if this.current_chunk_info.is_some() &&
- this.position == this.current_chunk_info.as_ref().unwrap().range.end
- {
- // optimization for sequential chunk read
- let next_idx = this.current_chunk_idx + 1;
- (next_idx, 0)
- } else {
- match this.index.chunk_from_offset(this.position) {
- Some(res) => res,
- None => return Poll::Ready(Ok(()))
- }
- };
-
- if idx >= this.index.index_count() {
- return Poll::Ready(Ok(()));
- }
-
- let info = this
- .index
- .chunk_info(idx)
- .ok_or_else(|| io_format_err!("could not get digest"))?;
-
- this.current_chunk_offset = offset;
- this.current_chunk_idx = idx;
- let old_info = this.current_chunk_info.replace(info.clone());
-
- if let Some(old_info) = old_info {
- if old_info.digest == info.digest {
- // hit, chunk is currently in cache
- this.state = AsyncIndexReaderState::HaveData;
- continue;
- }
- }
-
- // miss, need to download new chunk
- let store = match this.store.take() {
- Some(store) => store,
- None => {
- return Poll::Ready(Err(io_format_err!("could not find store")));
- }
- };
-
- let future = async move {
- store.read_chunk(&info.digest)
- .await
- .map(move |x| (store, x))
- };
-
- this.state = AsyncIndexReaderState::WaitForData(future.boxed());
- }
- AsyncIndexReaderState::WaitForData(ref mut future) => {
- match ready!(future.as_mut().poll(cx)) {
- Ok((store, chunk_data)) => {
- this.read_buffer = chunk_data;
- this.state = AsyncIndexReaderState::HaveData;
- this.store = Some(store);
- }
- Err(err) => {
- return Poll::Ready(Err(io_err_other(err)));
- }
- };
- }
- AsyncIndexReaderState::HaveData => {
- let offset = this.current_chunk_offset as usize;
- let len = this.read_buffer.len();
- let n = if len - offset < buf.remaining() {
- len - offset
- } else {
- buf.remaining()
- };
-
- buf.put_slice(&this.read_buffer[offset..(offset + n)]);
- this.position += n as u64;
-
- if offset + n == len {
- this.state = AsyncIndexReaderState::NoData;
- } else {
- this.current_chunk_offset += n as u64;
- this.state = AsyncIndexReaderState::HaveData;
- }
-
- return Poll::Ready(Ok(()));
- }
- }
- }
- }
-}
-
-impl<S, I> AsyncSeek for AsyncIndexReader<S, I>
-where
- S: AsyncReadChunk + Unpin + Sync + 'static,
- I: IndexFile + Unpin,
-{
- fn start_seek(
- self: Pin<&mut Self>,
- pos: SeekFrom,
- ) -> tokio::io::Result<()> {
- let this = Pin::get_mut(self);
- this.seek_to_pos = match pos {
- SeekFrom::Start(offset) => {
- offset as i64
- },
- SeekFrom::End(offset) => {
- this.index.index_bytes() as i64 + offset
- },
- SeekFrom::Current(offset) => {
- this.position as i64 + offset
- }
- };
- Ok(())
- }
-
- fn poll_complete(
- self: Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<tokio::io::Result<u64>> {
- let this = Pin::get_mut(self);
-
- let index_bytes = this.index.index_bytes();
- if this.seek_to_pos < 0 {
- return Poll::Ready(Err(io_format_err!("cannot seek to negative values")));
- } else if this.seek_to_pos > index_bytes as i64 {
- this.position = index_bytes;
- } else {
- this.position = this.seek_to_pos as u64;
- }
-
- // even if seeking within one chunk, we need to go to NoData to
- // recalculate the current_chunk_offset (data is cached anyway)
- this.state = AsyncIndexReaderState::NoData;
-
- Poll::Ready(Ok(this.position))
- }
-}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (5 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
` (2 subsequent siblings)
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Setting this to 0 is not just useless, but breaks the logic horribly
enough to cause random segfaults - better forbid this, to avoid someone
else having to debug it again ;)
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools/lru_cache.rs | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/tools/lru_cache.rs b/src/tools/lru_cache.rs
index 70289d3f..7c8cf25f 100644
--- a/src/tools/lru_cache.rs
+++ b/src/tools/lru_cache.rs
@@ -106,6 +106,7 @@ unsafe impl<K: Send, V: Send> Send for LruCache<K, V> {}
impl<K: std::cmp::Eq + std::hash::Hash + Copy, V> LruCache<K, V> {
/// Create LRU cache instance which holds up to `capacity` nodes at once.
pub fn new(capacity: usize) -> Self {
+ let capacity = capacity.min(1);
Self {
map: HashMap::with_capacity(capacity),
list: LinkedList::new(),
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (6 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-08 7:58 ` [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Wolfgang Bumiller
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Provides a shared AsyncLruCache of 256MB (w/ 4MB chunks) that can be
used by multiple readers at the same time. It is dropped once no more
readers exist, so the memory gets freed if all QEMU block/pbs instances
disappear.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
v2:
* adapt to dropping ChunkCache type alias from proxmox-backup
* I re-add it here locally only because otherwise the type of SHARED_CACHE
ends in 7 '>'s
src/lib.rs | 7 ++++++-
src/shared_cache.rs | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
create mode 100644 src/shared_cache.rs
diff --git a/src/lib.rs b/src/lib.rs
index 05d7b58..aa167f7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -25,6 +25,7 @@ mod restore;
use restore::*;
mod tools;
+mod shared_cache;
pub const PROXMOX_BACKUP_DEFAULT_CHUNK_SIZE: u64 = 1024*1024*4;
@@ -804,7 +805,11 @@ pub extern "C" fn proxmox_restore_connect_async(
pub extern "C" fn proxmox_restore_disconnect(handle: *mut ProxmoxRestoreHandle) {
let restore_task = handle as * mut Arc<RestoreTask>;
- unsafe { Box::from_raw(restore_task) }; //drop(restore_task)
+ let restore_task = unsafe { Box::from_raw(restore_task) };
+ drop(restore_task);
+
+ // after dropping, cache may be unused (if no other handles open)
+ shared_cache::shared_chunk_cache_cleanup();
}
/// Restore an image (sync)
diff --git a/src/shared_cache.rs b/src/shared_cache.rs
new file mode 100644
index 0000000..32ac430
--- /dev/null
+++ b/src/shared_cache.rs
@@ -0,0 +1,37 @@
+use once_cell::sync::OnceCell;
+use proxmox_backup::tools::async_lru_cache::AsyncLruCache;
+use std::sync::{Arc, Mutex};
+
+type ChunkCache = AsyncLruCache<[u8; 32], Arc<Vec<u8>>>;
+
+const SHARED_CACHE_CAPACITY: usize = 64; // 256 MB
+static SHARED_CACHE: OnceCell<Mutex<Option<Arc<ChunkCache>>>> = OnceCell::new();
+
+pub fn get_shared_chunk_cache() -> Arc<ChunkCache> {
+ let mut guard = SHARED_CACHE
+ .get_or_init(|| Mutex::new(None))
+ .lock()
+ .unwrap();
+ match &*guard {
+ Some(cache) => Arc::clone(cache),
+ None => {
+ let cache = Arc::new(AsyncLruCache::new(SHARED_CACHE_CAPACITY));
+ *guard = Some(Arc::clone(&cache));
+ cache
+ }
+ }
+}
+
+pub fn shared_chunk_cache_cleanup() {
+ let mut guard = SHARED_CACHE
+ .get_or_init(|| Mutex::new(None))
+ .lock()
+ .unwrap();
+ if let Some(arc) = guard.as_ref() {
+ let refcount = Arc::strong_count(arc);
+ if refcount == 1 {
+ // no one else is using the cache anymore, drop it
+ let _drop = guard.take();
+ }
+ }
+}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (7 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
@ 2021-06-07 15:35 ` Stefan Reiter
2021-06-08 7:58 ` [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Wolfgang Bumiller
9 siblings, 0 replies; 11+ messages in thread
From: Stefan Reiter @ 2021-06-07 15:35 UTC (permalink / raw)
To: pve-devel, pbs-devel
Use the new CachedChunkReader with the shared_cache implementation to
provide a concurrency-safe async way of accessing data. This provides
two benefits:
* uses a shared LRU cache, which is very helpful for random-access like
during a live-restore
* does away with the global Mutex in read_image_at, providing real
concurrency without lock contention
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/restore.rs | 30 ++++++++----------------------
1 file changed, 8 insertions(+), 22 deletions(-)
diff --git a/src/restore.rs b/src/restore.rs
index 0790d7f..33959d9 100644
--- a/src/restore.rs
+++ b/src/restore.rs
@@ -1,10 +1,8 @@
use std::sync::{Arc, Mutex};
-use std::io::SeekFrom;
use std::convert::TryInto;
use anyhow::{format_err, bail, Error};
use once_cell::sync::OnceCell;
-use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::runtime::Runtime;
use proxmox_backup::tools::runtime::get_runtime_with_builder;
@@ -14,9 +12,10 @@ use proxmox_backup::client::{HttpClient, HttpClientOptions, BackupReader, Remote
use super::BackupSetup;
use crate::registry::Registry;
use crate::capi_types::DataPointer;
+use crate::shared_cache::get_shared_chunk_cache;
struct ImageAccessInfo {
- reader: Arc<tokio::sync::Mutex<AsyncIndexReader<RemoteChunkReader, FixedIndexReader>>>,
+ reader: Arc<CachedChunkReader<FixedIndexReader, RemoteChunkReader>>,
_archive_name: String,
archive_size: u64,
}
@@ -229,12 +228,13 @@ impl RestoreTask {
most_used,
);
- let reader = AsyncIndexReader::new(index, chunk_reader);
+ let cache = get_shared_chunk_cache();
+ let reader = Arc::new(CachedChunkReader::new_with_cache(chunk_reader, index, cache));
let info = ImageAccessInfo {
archive_size,
_archive_name: archive_name, /// useful to debug
- reader: Arc::new(tokio::sync::Mutex::new(reader)),
+ reader,
};
(*self.image_registry.lock().unwrap()).register(info)
@@ -258,23 +258,9 @@ impl RestoreTask {
bail!("read index {} out of bounds {}", offset, image_size);
}
- let mut reader = reader.lock().await;
-
- let buf: &mut [u8] = unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize)};
- let mut read = 0;
-
- while read < size {
- reader.seek(SeekFrom::Start(offset + read)).await?;
- let bytes = reader.read(&mut buf[read as usize..]).await?;
-
- if bytes == 0 {
- // EOF
- break;
- }
-
- read += bytes as u64;
- }
-
+ let buf: &mut [u8] =
+ unsafe { std::slice::from_raw_parts_mut(data.0 as *mut u8, size as usize) };
+ let read = reader.read_at(buf, offset).await?;
Ok(read.try_into()?)
}
}
--
2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread
* [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
` (8 preceding siblings ...)
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
@ 2021-06-08 7:58 ` Wolfgang Bumiller
9 siblings, 0 replies; 11+ messages in thread
From: Wolfgang Bumiller @ 2021-06-08 7:58 UTC (permalink / raw)
To: Stefan Reiter; +Cc: pve-devel, pbs-devel
applied series
On Mon, Jun 07, 2021 at 05:35:23PM +0200, Stefan Reiter wrote:
> This series is the third/fourth attempt[0] at substantially improving
> live-restore performance. This time, a fully async- and concurrency safe LRU
> cache is implemented, and a new CachedChunkReader is used to provide lock-free
> reading from a remote chunk source. The big performance improvements come from
> removing the global Mutex synchronising all read_image_at requests in the QEMU
> library and the shared LRU cache.
>
> The idea of using 'mmap' to alleviate memory pressure was dropped in favor of
> using a shared cache between drives. This provides about the same performance
> improvement in typical live-restore scenarios, but with a fixed memory
> requirement, independant of drives. Adding 'mmap' caching is doable, but the
> Rust code really doesn't look very nice with it, and I think it's not necessary
> in this version...
>
> Anyway, I figured out how to better benchmark the changes too, and can now
> reproduce the performance gains very reliably. Below is a (gnu)plot of running
> 40 live-restores of a Windows 10 VM with a single 32GB disk, first with the
> currently shipping libproxmox-backup-qemu0 (A) and then with the patched one
> from this series (B). Testing was done with QEMU 6.0.
>
>
> v2:
> * address Wolfgang's review comments, mostly style stuff
>
>
> seconds
> 240 +-----------------------------------------------------------------------+
> | + + + + + + A + |
> 230 |-+ A A A +-|
> | A A A AA A A A A |
> 220 |-+ A AA A A +-|
> | A A A A A A AA A A |
> 210 |-+ A A A A A A A-|
> | A |
> 200 |-+ A +-|
> | |
> 190 |-+ +-|
> | |
> 180 |-A +-|
> | |
> 170 |-+ +-|
> | B B B |
> 160 |-B B B B B B B +-|
> | B B B B B |
> 150 |-+ BB B BB B B B B B B B B-|
> | B + B + + B + B B + B + B B + B |
> 140 +-----------------------------------------------------------------------+
> 0 5 10 15 20 25 30 35 40
> iteration
> 1.0.3-1 A
> patched B
>
>
> [0] see: https://lists.proxmox.com/pipermail/pbs-devel/2021-April/002932.html
>
>
> proxmox-backup: Stefan Reiter (7):
> tools/BroadcastFuture: add testcase for better understanding
> tools: add AsyncLruCache as a wrapper around sync LruCache
> backup: add CachedChunkReader utilizing AsyncLruCache
> backup: add AsyncRead/Seek to CachedChunkReader
> replace AsyncIndexReader with SeekableCachedChunkReader
> backup: remove AsyncIndexReader
> tools/lru_cache: make minimum capacity 1
>
> src/api2/admin/datastore.rs | 4 +-
> src/backup.rs | 4 +-
> src/backup/async_index_reader.rs | 215 -------------------------
> src/backup/cached_chunk_reader.rs | 189 ++++++++++++++++++++++
> src/bin/proxmox_backup_client/mount.rs | 4 +-
> src/tools.rs | 1 +
> src/tools/async_lru_cache.rs | 135 ++++++++++++++++
> src/tools/broadcast_future.rs | 11 ++
> src/tools/lru_cache.rs | 1 +
> 9 files changed, 343 insertions(+), 221 deletions(-)
> delete mode 100644 src/backup/async_index_reader.rs
> create mode 100644 src/backup/cached_chunk_reader.rs
> create mode 100644 src/tools/async_lru_cache.rs
>
> proxmox-backup-qemu: Stefan Reiter (2):
> add shared_cache module
> access: use CachedChunkReader
>
> src/lib.rs | 7 ++++++-
> src/restore.rs | 30 ++++++++----------------------
> src/shared_cache.rs | 37 +++++++++++++++++++++++++++++++++++++
> 3 files changed, 51 insertions(+), 23 deletions(-)
> create mode 100644 src/shared_cache.rs
>
> --
> 2.30.2
^ permalink raw reply [flat|nested] 11+ messages in thread