From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pve-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache
Date: Wed, 2 Jun 2021 16:38:26 +0200 [thread overview]
Message-ID: <20210602143833.4423-3-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>
Supports concurrent 'access' calls to the same key via a
BroadcastFuture. These are stored in a seperate HashMap, the LruCache
underneath is only modified once a valid value has been retrieved.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools.rs | 1 +
src/tools/async_lru_cache.rs | 135 +++++++++++++++++++++++++++++++++++
2 files changed, 136 insertions(+)
create mode 100644 src/tools/async_lru_cache.rs
diff --git a/src/tools.rs b/src/tools.rs
index 65049b1e..59599339 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -42,6 +42,7 @@ pub mod json;
pub mod logrotate;
pub mod loopdev;
pub mod lru_cache;
+pub mod async_lru_cache;
pub mod nom;
pub mod runtime;
pub mod serde_filter;
diff --git a/src/tools/async_lru_cache.rs b/src/tools/async_lru_cache.rs
new file mode 100644
index 00000000..cc385ec9
--- /dev/null
+++ b/src/tools/async_lru_cache.rs
@@ -0,0 +1,135 @@
+//! An 'async'-safe layer on the existing sync LruCache implementation. Supports multiple
+//! concurrent requests to the same key.
+
+use anyhow::Error;
+
+use std::collections::HashMap;
+use std::future::Future;
+use std::sync::{Arc, Mutex};
+
+use super::lru_cache::LruCache;
+use super::BroadcastFuture;
+
+/// Interface for asynchronously getting values on cache misses.
+pub trait AsyncCacher<K, V: Clone>: Sync + Send {
+ /// Fetch a value for key on cache miss.
+ ///
+ /// Works similar to non-async lru_cache::Cacher, but if the key has already been requested
+ /// and the result is not cached yet, the 'fetch' function will not be called and instead the
+ /// result of the original request cloned and returned upon completion.
+ ///
+ /// The underlying LRU structure is not modified until the returned future resolves to an
+ /// Ok(Some(_)) value.
+ fn fetch(&self, key: K) -> Box<dyn Future<Output = Result<Option<V>, Error>> + Send>;
+}
+
+/// See tools::lru_cache::LruCache, this implements an async-safe variant of that with the help of
+/// AsyncCacher.
+#[derive(Clone)]
+pub struct AsyncLruCache<K, V> {
+ maps: Arc<Mutex<(LruCache<K, V>, HashMap<K, BroadcastFuture<Option<V>>>)>>,
+}
+
+impl<K: std::cmp::Eq + std::hash::Hash + Copy, V: Clone + Send + 'static> AsyncLruCache<K, V> {
+ /// Create a new AsyncLruCache with the given maximum capacity.
+ pub fn new(capacity: usize) -> Self {
+ Self {
+ maps: Arc::new(Mutex::new((LruCache::new(capacity), HashMap::new()))),
+ }
+ }
+
+ /// Access an item either via the cache or by calling cacher.fetch. A return value of Ok(None)
+ /// means the item requested has no representation, Err(_) means a call to fetch() failed,
+ /// regardless of whether it was initiated by this call or a previous one.
+ pub async fn access(&self, key: K, cacher: &dyn AsyncCacher<K, V>) -> Result<Option<V>, Error> {
+ let (owner, result_fut) = {
+ // check if already requested
+ let mut maps = self.maps.lock().unwrap();
+ if let Some(fut) = maps.1.get(&key) {
+ // wait for the already scheduled future to resolve
+ (false, fut.listen())
+ } else {
+ // check if value is cached in LRU
+ if let Some(val) = maps.0.get_mut(key) {
+ return Ok(Some(val.clone()));
+ }
+
+ // if neither, start broadcast future and put into map while we still have lock
+ let fut = cacher.fetch(key);
+ let broadcast = BroadcastFuture::new(fut);
+ let result_fut = broadcast.listen();
+ maps.1.insert(key, broadcast);
+ (true, result_fut)
+ }
+ // drop Mutex before awaiting any future
+ };
+
+ let result = result_fut.await;
+ match result {
+ Ok(Some(ref value)) if owner => {
+ // this call was the one initiating the request, put into LRU and remove from map
+ let mut maps = self.maps.lock().unwrap();
+ maps.0.insert(key, value.clone());
+ maps.1.remove(&key);
+ }
+ _ => {}
+ }
+ result
+ }
+}
+
+mod test {
+ use super::*;
+
+ struct TestAsyncCacher {
+ prefix: &'static str,
+ }
+
+ impl AsyncCacher<i32, String> for TestAsyncCacher {
+ fn fetch(
+ &self,
+ key: i32,
+ ) -> Box<dyn Future<Output = Result<Option<String>, Error>> + Send> {
+ let x = self.prefix;
+ Box::new(async move { Ok(Some(format!("{}{}", x, key))) })
+ }
+ }
+
+ #[test]
+ fn test_async_lru_cache() {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async move {
+ let cacher = TestAsyncCacher { prefix: "x" };
+ let cache: AsyncLruCache<i32, String> = AsyncLruCache::new(2);
+
+ assert_eq!(
+ cache.access(10, &cacher).await.unwrap(),
+ Some("x10".to_string())
+ );
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ assert_eq!(
+ cache.access(30, &cacher).await.unwrap(),
+ Some("x30".to_string())
+ );
+
+ for _ in 0..10 {
+ let c = cache.clone();
+ tokio::spawn(async move {
+ let cacher = TestAsyncCacher { prefix: "y" };
+ assert_eq!(
+ c.access(40, &cacher).await.unwrap(),
+ Some("y40".to_string())
+ );
+ });
+ }
+
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ });
+ }
+}
--
2.30.2
WARNING: multiple messages have this Message-ID
From: Stefan Reiter <s.reiter@proxmox.com>
To: pve-devel@lists.proxmox.com, pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache
Date: Wed, 2 Jun 2021 16:38:26 +0200 [thread overview]
Message-ID: <20210602143833.4423-3-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210602143833.4423-1-s.reiter@proxmox.com>
Supports concurrent 'access' calls to the same key via a
BroadcastFuture. These are stored in a seperate HashMap, the LruCache
underneath is only modified once a valid value has been retrieved.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools.rs | 1 +
src/tools/async_lru_cache.rs | 135 +++++++++++++++++++++++++++++++++++
2 files changed, 136 insertions(+)
create mode 100644 src/tools/async_lru_cache.rs
diff --git a/src/tools.rs b/src/tools.rs
index 65049b1e..59599339 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -42,6 +42,7 @@ pub mod json;
pub mod logrotate;
pub mod loopdev;
pub mod lru_cache;
+pub mod async_lru_cache;
pub mod nom;
pub mod runtime;
pub mod serde_filter;
diff --git a/src/tools/async_lru_cache.rs b/src/tools/async_lru_cache.rs
new file mode 100644
index 00000000..cc385ec9
--- /dev/null
+++ b/src/tools/async_lru_cache.rs
@@ -0,0 +1,135 @@
+//! An 'async'-safe layer on the existing sync LruCache implementation. Supports multiple
+//! concurrent requests to the same key.
+
+use anyhow::Error;
+
+use std::collections::HashMap;
+use std::future::Future;
+use std::sync::{Arc, Mutex};
+
+use super::lru_cache::LruCache;
+use super::BroadcastFuture;
+
+/// Interface for asynchronously getting values on cache misses.
+pub trait AsyncCacher<K, V: Clone>: Sync + Send {
+ /// Fetch a value for key on cache miss.
+ ///
+ /// Works similar to non-async lru_cache::Cacher, but if the key has already been requested
+ /// and the result is not cached yet, the 'fetch' function will not be called and instead the
+ /// result of the original request cloned and returned upon completion.
+ ///
+ /// The underlying LRU structure is not modified until the returned future resolves to an
+ /// Ok(Some(_)) value.
+ fn fetch(&self, key: K) -> Box<dyn Future<Output = Result<Option<V>, Error>> + Send>;
+}
+
+/// See tools::lru_cache::LruCache, this implements an async-safe variant of that with the help of
+/// AsyncCacher.
+#[derive(Clone)]
+pub struct AsyncLruCache<K, V> {
+ maps: Arc<Mutex<(LruCache<K, V>, HashMap<K, BroadcastFuture<Option<V>>>)>>,
+}
+
+impl<K: std::cmp::Eq + std::hash::Hash + Copy, V: Clone + Send + 'static> AsyncLruCache<K, V> {
+ /// Create a new AsyncLruCache with the given maximum capacity.
+ pub fn new(capacity: usize) -> Self {
+ Self {
+ maps: Arc::new(Mutex::new((LruCache::new(capacity), HashMap::new()))),
+ }
+ }
+
+ /// Access an item either via the cache or by calling cacher.fetch. A return value of Ok(None)
+ /// means the item requested has no representation, Err(_) means a call to fetch() failed,
+ /// regardless of whether it was initiated by this call or a previous one.
+ pub async fn access(&self, key: K, cacher: &dyn AsyncCacher<K, V>) -> Result<Option<V>, Error> {
+ let (owner, result_fut) = {
+ // check if already requested
+ let mut maps = self.maps.lock().unwrap();
+ if let Some(fut) = maps.1.get(&key) {
+ // wait for the already scheduled future to resolve
+ (false, fut.listen())
+ } else {
+ // check if value is cached in LRU
+ if let Some(val) = maps.0.get_mut(key) {
+ return Ok(Some(val.clone()));
+ }
+
+ // if neither, start broadcast future and put into map while we still have lock
+ let fut = cacher.fetch(key);
+ let broadcast = BroadcastFuture::new(fut);
+ let result_fut = broadcast.listen();
+ maps.1.insert(key, broadcast);
+ (true, result_fut)
+ }
+ // drop Mutex before awaiting any future
+ };
+
+ let result = result_fut.await;
+ match result {
+ Ok(Some(ref value)) if owner => {
+ // this call was the one initiating the request, put into LRU and remove from map
+ let mut maps = self.maps.lock().unwrap();
+ maps.0.insert(key, value.clone());
+ maps.1.remove(&key);
+ }
+ _ => {}
+ }
+ result
+ }
+}
+
+mod test {
+ use super::*;
+
+ struct TestAsyncCacher {
+ prefix: &'static str,
+ }
+
+ impl AsyncCacher<i32, String> for TestAsyncCacher {
+ fn fetch(
+ &self,
+ key: i32,
+ ) -> Box<dyn Future<Output = Result<Option<String>, Error>> + Send> {
+ let x = self.prefix;
+ Box::new(async move { Ok(Some(format!("{}{}", x, key))) })
+ }
+ }
+
+ #[test]
+ fn test_async_lru_cache() {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async move {
+ let cacher = TestAsyncCacher { prefix: "x" };
+ let cache: AsyncLruCache<i32, String> = AsyncLruCache::new(2);
+
+ assert_eq!(
+ cache.access(10, &cacher).await.unwrap(),
+ Some("x10".to_string())
+ );
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ assert_eq!(
+ cache.access(30, &cacher).await.unwrap(),
+ Some("x30".to_string())
+ );
+
+ for _ in 0..10 {
+ let c = cache.clone();
+ tokio::spawn(async move {
+ let cacher = TestAsyncCacher { prefix: "y" };
+ assert_eq!(
+ c.access(40, &cacher).await.unwrap(),
+ Some("y40".to_string())
+ );
+ });
+ }
+
+ assert_eq!(
+ cache.access(20, &cacher).await.unwrap(),
+ Some("x20".to_string())
+ );
+ });
+ }
+}
--
2.30.2
next prev parent reply other threads:[~2021-06-02 14:38 UTC|newest]
Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-06-02 14:38 [pve-devel] [PATCH 0/9] Improve live-restore speed and replace AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 1/9] tools/BroadcastFuture: add testcase for better understanding Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` Stefan Reiter [this message]
2021-06-02 14:38 ` [pbs-devel] [PATCH proxmox-backup 2/9] tools: add AsyncLruCache as a wrapper around sync LruCache Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 3/9] backup: add CachedChunkReader utilizing AsyncLruCache Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:22 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:22 ` Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 4/9] backup: add AsyncRead/Seek to CachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:30 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:30 ` Wolfgang Bumiller
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 5/9] replace AsyncIndexReader with SeekableCachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 6/9] backup: remove AsyncIndexReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup 7/9] tools/lru_cache: make minimum capacity 1 Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 8/9] add shared_cache module Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
2021-06-04 12:16 ` [pve-devel] " Wolfgang Bumiller
2021-06-04 12:16 ` Wolfgang Bumiller
2021-06-07 8:03 ` [pve-devel] " Stefan Reiter
2021-06-07 8:03 ` Stefan Reiter
2021-06-02 14:38 ` [pve-devel] [PATCH proxmox-backup-qemu 9/9] access: use CachedChunkReader Stefan Reiter
2021-06-02 14:38 ` [pbs-devel] " Stefan Reiter
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210602143833.4423-3-s.reiter@proxmox.com \
--to=s.reiter@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
--cc=pve-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.