public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Stefan Reiter <s.reiter@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 2/3] RemoteChunkReader: add LRU cached variant
Date: Wed, 28 Apr 2021 18:06:54 +0200	[thread overview]
Message-ID: <20210428160655.29941-3-s.reiter@proxmox.com> (raw)
In-Reply-To: <20210428160655.29941-1-s.reiter@proxmox.com>

Retain the old constructor for compatibility, most use cases don't need
an LRU cache anyway. Uses the MmapBuffer backend.

For now convert the 'map' API to use the new variant, as the same set
of chunks might be accessed multiple times in a random pattern there.

Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
 src/bin/proxmox_backup_client/mount.rs |  12 ++-
 src/client/remote_chunk_reader.rs      | 110 +++++++++++++++++++++----
 2 files changed, 104 insertions(+), 18 deletions(-)

diff --git a/src/bin/proxmox_backup_client/mount.rs b/src/bin/proxmox_backup_client/mount.rs
index f3498e35..6494c900 100644
--- a/src/bin/proxmox_backup_client/mount.rs
+++ b/src/bin/proxmox_backup_client/mount.rs
@@ -280,7 +280,17 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
     } else if server_archive_name.ends_with(".fidx") {
         let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
         let size = index.index_bytes();
-        let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new());
+        let chunk_reader = RemoteChunkReader::new_lru_cached(
+            client.clone(),
+            crypt_config,
+            file_info.chunk_crypt_mode(),
+            HashMap::new(),
+            16,
+            index
+                .chunk_info(0)
+                .map(|info| info.size() as usize)
+                .unwrap_or(4 * 1024 * 1024),
+        )?;
         let reader = AsyncIndexReader::new(index, chunk_reader);
 
         let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
diff --git a/src/client/remote_chunk_reader.rs b/src/client/remote_chunk_reader.rs
index 06f693a2..35e279bf 100644
--- a/src/client/remote_chunk_reader.rs
+++ b/src/client/remote_chunk_reader.rs
@@ -1,5 +1,6 @@
 use std::future::Future;
 use std::collections::HashMap;
+use std::convert::TryInto;
 use std::pin::Pin;
 use std::sync::{Arc, Mutex};
 
@@ -8,6 +9,14 @@ use anyhow::{bail, Error};
 use super::BackupReader;
 use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
 use crate::tools::runtime::block_on;
+use crate::tools::lru_cache::LruCache;
+use crate::tools::mmap_buffer::{MmapBuffer, MmapBufferEntry};
+
+struct Cache {
+    cache_hint: HashMap<[u8; 32], usize>,
+    hinted: HashMap<[u8; 32], Vec<u8>>,
+    lru: Option<(LruCache<[u8; 32], MmapBufferEntry>, Arc<MmapBuffer>)>,
+}
 
 /// Read chunks from remote host using ``BackupReader``
 #[derive(Clone)]
@@ -15,8 +24,7 @@ pub struct RemoteChunkReader {
     client: Arc<BackupReader>,
     crypt_config: Option<Arc<CryptConfig>>,
     crypt_mode: CryptMode,
-    cache_hint: Arc<HashMap<[u8; 32], usize>>,
-    cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
+    cache: Arc<Mutex<Cache>>,
 }
 
 impl RemoteChunkReader {
@@ -33,11 +41,42 @@ impl RemoteChunkReader {
             client,
             crypt_config,
             crypt_mode,
-            cache_hint: Arc::new(cache_hint),
-            cache: Arc::new(Mutex::new(HashMap::new())),
+            cache: Arc::new(Mutex::new(Cache {
+                hinted: HashMap::with_capacity(cache_hint.len()),
+                cache_hint,
+                lru: None,
+            })),
         }
     }
 
+    /// Create a new instance.
+    ///
+    /// Chunks listed in ``cache_hint`` are cached and kept in RAM, as well as the last
+    /// 'cache_lru' accessed chunks (the latter via a mmap file).
+    pub fn new_lru_cached(
+        client: Arc<BackupReader>,
+        crypt_config: Option<Arc<CryptConfig>>,
+        crypt_mode: CryptMode,
+        cache_hint: HashMap<[u8; 32], usize>,
+        cache_lru: usize,
+        max_chunk_size: usize,
+    ) -> Result<Self, Error> {
+        let new = Self::new(client, crypt_config, crypt_mode, cache_hint);
+        {
+            let mut cache = new.cache.lock().unwrap();
+            cache.lru = Some((
+                LruCache::new(cache_lru),
+                MmapBuffer::new(
+                    // account for chunk size prefix
+                    max_chunk_size + std::mem::size_of::<usize>(),
+                    // the LruCache may hold one more item than its capacity during insert
+                    cache_lru + 1,
+                )?,
+            ));
+        }
+        Ok(new)
+    }
+
     /// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
     /// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
     pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
@@ -64,6 +103,51 @@ impl RemoteChunkReader {
             },
         }
     }
+
+    fn cache_get(&self, digest: &[u8; 32]) -> Option<Vec<u8>> {
+        let cache = &mut *self.cache.lock().unwrap();
+        if let Some(data) = cache.hinted.get(digest) {
+            return Some(data.to_vec());
+        }
+
+        cache
+            .lru
+            .as_mut()
+            .map(|lru| lru.0.get_mut(*digest).map(|alloc| {
+                let s = std::mem::size_of::<usize>();
+                let len = usize::from_ne_bytes(alloc[..s].try_into().unwrap());
+                alloc[s..(len + s)].to_vec()
+            }))
+            .flatten()
+    }
+
+    fn cache_insert(&self, digest: &[u8; 32], raw_data: &[u8]) {
+        let cache = &mut *self.cache.lock().unwrap();
+
+        // if hinted, always cache given digest
+        if cache.cache_hint.contains_key(digest) {
+            cache.hinted.insert(*digest, raw_data.to_vec());
+            return;
+        }
+
+        // otherwise put in LRU
+        if let Some((ref mut lru, ref mut mmap)) = cache.lru {
+            let mut alloc = match mmap.allocate() {
+                Ok(alloc) => alloc,
+                Err(err) => {
+                    // *really* shouldn't happen, log to stderr/journal if it does - we can
+                    // continue reading data, it just won't be cached
+                    eprintln!("RemoteChunkReader: error on LRU alloc: {}", err);
+                    return;
+                }
+            };
+            let s = std::mem::size_of::<usize>();
+            // prefix with chunk size
+            alloc[0..s].copy_from_slice(&raw_data.len().to_ne_bytes()[..]);
+            alloc[s..(s + raw_data.len())].copy_from_slice(raw_data);
+            lru.insert(*digest, alloc);
+        }
+    }
 }
 
 impl ReadChunk for RemoteChunkReader {
@@ -72,18 +156,14 @@ impl ReadChunk for RemoteChunkReader {
     }
 
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
-        if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
-            return Ok(raw_data.to_vec());
+        if let Some(raw_data) = self.cache_get(digest) {
+            return Ok(raw_data);
         }
 
         let chunk = ReadChunk::read_raw_chunk(self, digest)?;
 
         let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
-        let use_cache = self.cache_hint.contains_key(digest);
-        if use_cache {
-            (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
-        }
+        self.cache_insert(digest, &raw_data);
 
         Ok(raw_data)
     }
@@ -102,18 +182,14 @@ impl AsyncReadChunk for RemoteChunkReader {
         digest: &'a [u8; 32],
     ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
         Box::pin(async move {
-            if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
+            if let Some(raw_data) = self.cache_get(digest) {
                 return Ok(raw_data.to_vec());
             }
 
             let chunk = Self::read_raw_chunk(self, digest).await?;
 
             let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
-            let use_cache = self.cache_hint.contains_key(digest);
-            if use_cache {
-                (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
-            }
+            self.cache_insert(digest, &raw_data);
 
             Ok(raw_data)
         })
-- 
2.20.1





  parent reply	other threads:[~2021-04-28 16:07 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-28 16:06 [pbs-devel] [PATCH 0/3] Add cache for live-restore Stefan Reiter
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup 1/3] tools: add mmap_buffer module Stefan Reiter
2021-04-28 16:06 ` Stefan Reiter [this message]
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup-qemu 3/3] access: use bigger cache and LRU chunk reader Stefan Reiter
2021-04-28 16:26 [pbs-devel] [PATCH proxmox-backup 2/3] RemoteChunkReader: add LRU cached variant Dietmar Maurer
2021-04-29  8:06 ` Stefan Reiter
2021-04-29  8:12 Dietmar Maurer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210428160655.29941-3-s.reiter@proxmox.com \
    --to=s.reiter@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal