all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache
Date: Mon,  7 Jun 2021 17:35:26 +0200	[thread overview]
Message-ID: <20210607153532.2522267-4-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com>

Provides a fast arbitrary read implementation with full async and
concurrency support.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

This is technically all that's needed for proxmox-backup-qemu to build and
function as intended, but I decided to also use this IMHO cleaner implementation
to replace the AsyncIndexReader with the following patches.

v2:
* drop ChunkCache type alias, not necessary and looked weird, since it couldn't
  be constructed directly
* add comment to other unwrap in read_at

 src/backup.rs                     |  3 ++
 src/backup/cached_chunk_reader.rs | 89 +++++++++++++++++++++++++++++++
 2 files changed, 92 insertions(+)
 create mode 100644 src/backup/cached_chunk_reader.rs

diff --git a/src/backup.rs b/src/backup.rs
index ae937be0..5e1147b4 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -259,3 +259,6 @@ pub use catalog_shell::*;
 
 mod async_index_reader;
 pub use async_index_reader::*;
+
+mod cached_chunk_reader;
+pub use cached_chunk_reader::*;
diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
new file mode 100644
index 00000000..ff476e37
--- /dev/null
+++ b/src/backup/cached_chunk_reader.rs
@@ -0,0 +1,89 @@
+//! An async and concurrency safe data reader backed by a local LRU cache.
+
+use anyhow::Error;
+
+use std::future::Future;
+use std::sync::Arc;
+
+use crate::backup::{AsyncReadChunk, IndexFile};
+use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+
+struct AsyncChunkCacher<T> {
+    reader: Arc<T>,
+}
+
+impl<T: AsyncReadChunk + Send + Sync + 'static> AsyncCacher<[u8; 32], Arc<Vec<u8>>>
+    for AsyncChunkCacher<T>
+{
+    fn fetch(
+        &self,
+        key: [u8; 32],
+    ) -> Box<dyn Future<Output = Result<Option<Arc<Vec<u8>>>, Error>> + Send> {
+        let reader = Arc::clone(&self.reader);
+        Box::new(async move {
+            AsyncReadChunk::read_chunk(reader.as_ref(), &key)
+                .await
+                .map(|x| Some(Arc::new(x)))
+        })
+    }
+}
+
+/// Allows arbitrary data reads from an Index via an AsyncReadChunk implementation, using an LRU
+/// cache internally to cache chunks and provide support for multiple concurrent reads (potentially
+/// to the same chunk).
+pub struct CachedChunkReader<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> {
+    cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+    cacher: AsyncChunkCacher<R>,
+    index: I,
+}
+
+impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<I, R> {
+    /// Create a new reader with a local LRU cache containing 'capacity' chunks.
+    pub fn new(reader: R, index: I, capacity: usize) -> Self {
+        let cache = Arc::new(AsyncLruCache::new(capacity));
+        Self::new_with_cache(reader, index, cache)
+    }
+
+    /// Create a new reader with a custom LRU cache. Use this to share a cache between multiple
+    /// readers.
+    pub fn new_with_cache(
+        reader: R,
+        index: I,
+        cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+    ) -> Self {
+        Self {
+            cache,
+            cacher: AsyncChunkCacher {
+                reader: Arc::new(reader),
+            },
+            index,
+        }
+    }
+
+    /// Read data at a given byte offset into a variable size buffer. Returns the amount of bytes
+    /// read, which will always be the size of the buffer except when reaching EOF.
+    pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
+        let size = buf.len();
+        let mut read: usize = 0;
+        while read < size {
+            let cur_offset = offset + read as u64;
+            if let Some(chunk) = self.index.chunk_from_offset(cur_offset) {
+                // chunk indices retrieved from chunk_from_offset always resolve to Some(_)
+                let info = self.index.chunk_info(chunk.0).unwrap();
+
+                // will never be None, see AsyncChunkCacher
+                let data = self.cache.access(info.digest, &self.cacher).await?.unwrap();
+
+                let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read);
+                let slice = &mut buf[read..(read + want_bytes)];
+                let intra_chunk = chunk.1 as usize;
+                slice.copy_from_slice(&data[intra_chunk..(intra_chunk + want_bytes)]);
+                read += want_bytes;
+            } else {
+                // EOF
+                break;
+            }
+        }
+        Ok(read)
+    }
+}
-- 
2.30.2





WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache
Date: Mon,  7 Jun 2021 17:35:26 +0200	[thread overview]
Message-ID: <20210607153532.2522267-4-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210607153532.2522267-1-s.reiter@proxmox.com>

Provides a fast arbitrary read implementation with full async and
concurrency support.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---

This is technically all that's needed for proxmox-backup-qemu to build and
function as intended, but I decided to also use this IMHO cleaner implementation
to replace the AsyncIndexReader with the following patches.

v2:
* drop ChunkCache type alias, not necessary and looked weird, since it couldn't
  be constructed directly
* add comment to other unwrap in read_at

 src/backup.rs                     |  3 ++
 src/backup/cached_chunk_reader.rs | 89 +++++++++++++++++++++++++++++++
 2 files changed, 92 insertions(+)
 create mode 100644 src/backup/cached_chunk_reader.rs

diff --git a/src/backup.rs b/src/backup.rs
index ae937be0..5e1147b4 100644
--- a/src/backup.rs
+++ b/src/backup.rs
@@ -259,3 +259,6 @@ pub use catalog_shell::*;
 
 mod async_index_reader;
 pub use async_index_reader::*;
+
+mod cached_chunk_reader;
+pub use cached_chunk_reader::*;
diff --git a/src/backup/cached_chunk_reader.rs b/src/backup/cached_chunk_reader.rs
new file mode 100644
index 00000000..ff476e37
--- /dev/null
+++ b/src/backup/cached_chunk_reader.rs
@@ -0,0 +1,89 @@
+//! An async and concurrency safe data reader backed by a local LRU cache.
+
+use anyhow::Error;
+
+use std::future::Future;
+use std::sync::Arc;
+
+use crate::backup::{AsyncReadChunk, IndexFile};
+use crate::tools::async_lru_cache::{AsyncCacher, AsyncLruCache};
+
+struct AsyncChunkCacher<T> {
+    reader: Arc<T>,
+}
+
+impl<T: AsyncReadChunk + Send + Sync + 'static> AsyncCacher<[u8; 32], Arc<Vec<u8>>>
+    for AsyncChunkCacher<T>
+{
+    fn fetch(
+        &self,
+        key: [u8; 32],
+    ) -> Box<dyn Future<Output = Result<Option<Arc<Vec<u8>>>, Error>> + Send> {
+        let reader = Arc::clone(&self.reader);
+        Box::new(async move {
+            AsyncReadChunk::read_chunk(reader.as_ref(), &key)
+                .await
+                .map(|x| Some(Arc::new(x)))
+        })
+    }
+}
+
+/// Allows arbitrary data reads from an Index via an AsyncReadChunk implementation, using an LRU
+/// cache internally to cache chunks and provide support for multiple concurrent reads (potentially
+/// to the same chunk).
+pub struct CachedChunkReader<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> {
+    cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+    cacher: AsyncChunkCacher<R>,
+    index: I,
+}
+
+impl<I: IndexFile, R: AsyncReadChunk + Send + Sync + 'static> CachedChunkReader<I, R> {
+    /// Create a new reader with a local LRU cache containing 'capacity' chunks.
+    pub fn new(reader: R, index: I, capacity: usize) -> Self {
+        let cache = Arc::new(AsyncLruCache::new(capacity));
+        Self::new_with_cache(reader, index, cache)
+    }
+
+    /// Create a new reader with a custom LRU cache. Use this to share a cache between multiple
+    /// readers.
+    pub fn new_with_cache(
+        reader: R,
+        index: I,
+        cache: Arc<AsyncLruCache<[u8; 32], Arc<Vec<u8>>>>,
+    ) -> Self {
+        Self {
+            cache,
+            cacher: AsyncChunkCacher {
+                reader: Arc::new(reader),
+            },
+            index,
+        }
+    }
+
+    /// Read data at a given byte offset into a variable size buffer. Returns the amount of bytes
+    /// read, which will always be the size of the buffer except when reaching EOF.
+    pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
+        let size = buf.len();
+        let mut read: usize = 0;
+        while read < size {
+            let cur_offset = offset + read as u64;
+            if let Some(chunk) = self.index.chunk_from_offset(cur_offset) {
+                // chunk indices retrieved from chunk_from_offset always resolve to Some(_)
+                let info = self.index.chunk_info(chunk.0).unwrap();
+
+                // will never be None, see AsyncChunkCacher
+                let data = self.cache.access(info.digest, &self.cacher).await?.unwrap();
+
+                let want_bytes = ((info.range.end - cur_offset) as usize).min(size - read);
+                let slice = &mut buf[read..(read + want_bytes)];
+                let intra_chunk = chunk.1 as usize;
+                slice.copy_from_slice(&data[intra_chunk..(intra_chunk + want_bytes)]);
+                read += want_bytes;
+            } else {
+                // EOF
+                break;
+            }
+        }
+        Ok(read)
+    }
+}
-- 
2.30.2





  parent reply	other threads:[~2021-06-07 15:35 UTC|newest]

Thread overview: 22+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-07 15:35 [pve-devel] [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-07 15:35 ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` Stefan Reiter [this message]
2021-06-07 15:35   ` [pbs-devel] [PATCH v2 proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-07 15:35 ` [pve-devel] [PATCH v2 proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-07 15:35   ` [pbs-devel] " Stefan Reiter
2021-06-08  7:58 ` [pve-devel] applied: [PATCH v2 0/9] Improve live-restore speed and replace AsyncIndexReader Wolfgang Bumiller
2021-06-08  7:58   ` [pbs-devel] " Wolfgang Bumiller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210607153532.2522267-4-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    --cc=pve-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal