* [pbs-devel] [PATCH proxmox-backup 1/3] tools: add mmap_buffer module
2021-04-28 16:06 [pbs-devel] [PATCH 0/3] Add cache for live-restore Stefan Reiter
@ 2021-04-28 16:06 ` Stefan Reiter
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup 2/3] RemoteChunkReader: add LRU cached variant Stefan Reiter
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup-qemu 3/3] access: use bigger cache and LRU chunk reader Stefan Reiter
2 siblings, 0 replies; 4+ messages in thread
From: Stefan Reiter @ 2021-04-28 16:06 UTC (permalink / raw)
To: pbs-devel
Can allocate a large buffer of fixed-size chunks via a memory mapped
file, to allow the kernel to easily swap these pages back to disk on
memory pressure.
Designed to be used with an LRU cache or similar.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/tools.rs | 1 +
src/tools/mmap_buffer.rs | 113 +++++++++++++++++++++++++++++++++++++++
2 files changed, 114 insertions(+)
create mode 100644 src/tools/mmap_buffer.rs
diff --git a/src/tools.rs b/src/tools.rs
index 08af55e5..8f22e6c8 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -34,6 +34,7 @@ pub mod json;
pub mod logrotate;
pub mod loopdev;
pub mod lru_cache;
+pub mod mmap_buffer;
pub mod nom;
pub mod runtime;
pub mod serde_filter;
diff --git a/src/tools/mmap_buffer.rs b/src/tools/mmap_buffer.rs
new file mode 100644
index 00000000..764303c2
--- /dev/null
+++ b/src/tools/mmap_buffer.rs
@@ -0,0 +1,113 @@
+//! Provides a buffer allocation API using a memory mapped file.
+use anyhow::{bail, Error};
+use nix::sys::mman;
+use nix::unistd::{ftruncate, unlink};
+use std::sync::{Arc, Mutex};
+
+use proxmox::tools::fs::{make_tmp_file, CreateOptions};
+use proxmox::tools::mmap::Mmap;
+
+pub struct MmapBufferEntry {
+ pub buf: &'static mut [u8],
+ entry_idx: usize,
+ parent: Arc<MmapBuffer>,
+}
+
+pub struct MmapBuffer {
+ entry_size: usize,
+ entry_count: usize,
+ bitmap: Mutex<Vec<u64>>,
+ mmap: Mmap<u8>,
+}
+
+impl MmapBuffer {
+ /// Create a new mmap buffer containing _count [u8] slices of length _size. This requires root
+ /// to allocate the temp file securely.
+ pub fn new(entry_size: usize, entry_count: usize) -> Result<Arc<Self>, Error> {
+ let bytes = entry_size * entry_count;
+ let (fd, path) = make_tmp_file(
+ "/var/cache/proxmox-backup/mmap-buffer",
+ CreateOptions::default(),
+ )?;
+ unlink(&path)?;
+ ftruncate(fd.0, bytes as i64)?;
+ let mmap = unsafe {
+ Mmap::map_fd(
+ fd.0,
+ 0,
+ bytes,
+ mman::ProtFlags::PROT_READ | mman::ProtFlags::PROT_WRITE,
+ mman::MapFlags::MAP_SHARED,
+ )?
+ };
+ Ok(Arc::new(Self {
+ entry_size,
+ entry_count,
+ bitmap: Mutex::new(vec![0; (entry_count+63)/64]),
+ mmap,
+ }))
+ }
+
+ #[inline]
+ fn bitmap_idx(lin: usize) -> (usize, u64) {
+ let idx = lin / 64;
+ let mask = 1 << (lin & 0x3F);
+ (idx, mask)
+ }
+
+ /// Returns one of the slices available in the buffer. The slice will be returned to this
+ /// instance on drop. If entry_count slices are already handed out, this function will fail.
+ pub fn allocate(self: &Arc<Self>) -> Result<MmapBufferEntry, Error> {
+ let bitmap = &mut (*self.bitmap.lock().unwrap());
+ for i in 0..self.entry_count as usize {
+ let (idx, mask) = Self::bitmap_idx(i);
+ if (bitmap[idx] & mask) == 0 {
+ bitmap[idx] |= mask;
+ let offset = self.entry_size * i;
+ let end = offset + self.entry_size;
+ // Safety:
+ // * mut: self.bitmap prevents multiple borrows on one region
+ // * 'static: the MmapBufferEntry contains a clone of the Arc pointing to ourselves
+ let data: &'static mut [u8] = unsafe {
+ std::slice::from_raw_parts_mut(
+ self.mmap[offset..end].as_ptr() as *mut u8,
+ end - offset,
+ )
+ };
+ let me = Arc::clone(&self);
+ return Ok(MmapBufferEntry {
+ buf: data,
+ entry_idx: i,
+ parent: me,
+ });
+ }
+ }
+
+ bail!("mmap-buffer: cannot allocate more entries in buffer");
+ }
+
+ fn deallocate(self: &Arc<Self>, offset: usize) {
+ let bitmap = &mut (*self.bitmap.lock().unwrap());
+ let (idx, mask) = Self::bitmap_idx(offset);
+ bitmap[idx] &= !mask;
+ }
+}
+
+impl Drop for MmapBufferEntry {
+ fn drop(&mut self) {
+ self.parent.deallocate(self.entry_idx);
+ }
+}
+
+impl std::ops::Deref for MmapBufferEntry {
+ type Target = [u8];
+ fn deref(&self) -> &Self::Target {
+ self.buf
+ }
+}
+
+impl std::ops::DerefMut for MmapBufferEntry {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ self.buf
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 4+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/3] RemoteChunkReader: add LRU cached variant
2021-04-28 16:06 [pbs-devel] [PATCH 0/3] Add cache for live-restore Stefan Reiter
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup 1/3] tools: add mmap_buffer module Stefan Reiter
@ 2021-04-28 16:06 ` Stefan Reiter
2021-04-28 16:06 ` [pbs-devel] [PATCH proxmox-backup-qemu 3/3] access: use bigger cache and LRU chunk reader Stefan Reiter
2 siblings, 0 replies; 4+ messages in thread
From: Stefan Reiter @ 2021-04-28 16:06 UTC (permalink / raw)
To: pbs-devel
Retain the old constructor for compatibility, most use cases don't need
an LRU cache anyway. Uses the MmapBuffer backend.
For now convert the 'map' API to use the new variant, as the same set
of chunks might be accessed multiple times in a random pattern there.
Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
---
src/bin/proxmox_backup_client/mount.rs | 12 ++-
src/client/remote_chunk_reader.rs | 110 +++++++++++++++++++++----
2 files changed, 104 insertions(+), 18 deletions(-)
diff --git a/src/bin/proxmox_backup_client/mount.rs b/src/bin/proxmox_backup_client/mount.rs
index f3498e35..6494c900 100644
--- a/src/bin/proxmox_backup_client/mount.rs
+++ b/src/bin/proxmox_backup_client/mount.rs
@@ -280,7 +280,17 @@ async fn mount_do(param: Value, pipe: Option<Fd>) -> Result<Value, Error> {
} else if server_archive_name.ends_with(".fidx") {
let index = client.download_fixed_index(&manifest, &server_archive_name).await?;
let size = index.index_bytes();
- let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, file_info.chunk_crypt_mode(), HashMap::new());
+ let chunk_reader = RemoteChunkReader::new_lru_cached(
+ client.clone(),
+ crypt_config,
+ file_info.chunk_crypt_mode(),
+ HashMap::new(),
+ 16,
+ index
+ .chunk_info(0)
+ .map(|info| info.size() as usize)
+ .unwrap_or(4 * 1024 * 1024),
+ )?;
let reader = AsyncIndexReader::new(index, chunk_reader);
let name = &format!("{}:{}/{}", repo.to_string(), path, archive_name);
diff --git a/src/client/remote_chunk_reader.rs b/src/client/remote_chunk_reader.rs
index 06f693a2..35e279bf 100644
--- a/src/client/remote_chunk_reader.rs
+++ b/src/client/remote_chunk_reader.rs
@@ -1,5 +1,6 @@
use std::future::Future;
use std::collections::HashMap;
+use std::convert::TryInto;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
@@ -8,6 +9,14 @@ use anyhow::{bail, Error};
use super::BackupReader;
use crate::backup::{AsyncReadChunk, CryptConfig, CryptMode, DataBlob, ReadChunk};
use crate::tools::runtime::block_on;
+use crate::tools::lru_cache::LruCache;
+use crate::tools::mmap_buffer::{MmapBuffer, MmapBufferEntry};
+
+struct Cache {
+ cache_hint: HashMap<[u8; 32], usize>,
+ hinted: HashMap<[u8; 32], Vec<u8>>,
+ lru: Option<(LruCache<[u8; 32], MmapBufferEntry>, Arc<MmapBuffer>)>,
+}
/// Read chunks from remote host using ``BackupReader``
#[derive(Clone)]
@@ -15,8 +24,7 @@ pub struct RemoteChunkReader {
client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>,
crypt_mode: CryptMode,
- cache_hint: Arc<HashMap<[u8; 32], usize>>,
- cache: Arc<Mutex<HashMap<[u8; 32], Vec<u8>>>>,
+ cache: Arc<Mutex<Cache>>,
}
impl RemoteChunkReader {
@@ -33,11 +41,42 @@ impl RemoteChunkReader {
client,
crypt_config,
crypt_mode,
- cache_hint: Arc::new(cache_hint),
- cache: Arc::new(Mutex::new(HashMap::new())),
+ cache: Arc::new(Mutex::new(Cache {
+ hinted: HashMap::with_capacity(cache_hint.len()),
+ cache_hint,
+ lru: None,
+ })),
}
}
+ /// Create a new instance.
+ ///
+ /// Chunks listed in ``cache_hint`` are cached and kept in RAM, as well as the last
+ /// 'cache_lru' accessed chunks (the latter via a mmap file).
+ pub fn new_lru_cached(
+ client: Arc<BackupReader>,
+ crypt_config: Option<Arc<CryptConfig>>,
+ crypt_mode: CryptMode,
+ cache_hint: HashMap<[u8; 32], usize>,
+ cache_lru: usize,
+ max_chunk_size: usize,
+ ) -> Result<Self, Error> {
+ let new = Self::new(client, crypt_config, crypt_mode, cache_hint);
+ {
+ let mut cache = new.cache.lock().unwrap();
+ cache.lru = Some((
+ LruCache::new(cache_lru),
+ MmapBuffer::new(
+ // account for chunk size prefix
+ max_chunk_size + std::mem::size_of::<usize>(),
+ // the LruCache may hold one more item than its capacity during insert
+ cache_lru + 1,
+ )?,
+ ));
+ }
+ Ok(new)
+ }
+
/// Downloads raw chunk. This only verifies the (untrusted) CRC32, use
/// DataBlob::verify_unencrypted or DataBlob::decode before storing/processing further.
pub async fn read_raw_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
@@ -64,6 +103,51 @@ impl RemoteChunkReader {
},
}
}
+
+ fn cache_get(&self, digest: &[u8; 32]) -> Option<Vec<u8>> {
+ let cache = &mut *self.cache.lock().unwrap();
+ if let Some(data) = cache.hinted.get(digest) {
+ return Some(data.to_vec());
+ }
+
+ cache
+ .lru
+ .as_mut()
+ .map(|lru| lru.0.get_mut(*digest).map(|alloc| {
+ let s = std::mem::size_of::<usize>();
+ let len = usize::from_ne_bytes(alloc[..s].try_into().unwrap());
+ alloc[s..(len + s)].to_vec()
+ }))
+ .flatten()
+ }
+
+ fn cache_insert(&self, digest: &[u8; 32], raw_data: &[u8]) {
+ let cache = &mut *self.cache.lock().unwrap();
+
+ // if hinted, always cache given digest
+ if cache.cache_hint.contains_key(digest) {
+ cache.hinted.insert(*digest, raw_data.to_vec());
+ return;
+ }
+
+ // otherwise put in LRU
+ if let Some((ref mut lru, ref mut mmap)) = cache.lru {
+ let mut alloc = match mmap.allocate() {
+ Ok(alloc) => alloc,
+ Err(err) => {
+ // *really* shouldn't happen, log to stderr/journal if it does - we can
+ // continue reading data, it just won't be cached
+ eprintln!("RemoteChunkReader: error on LRU alloc: {}", err);
+ return;
+ }
+ };
+ let s = std::mem::size_of::<usize>();
+ // prefix with chunk size
+ alloc[0..s].copy_from_slice(&raw_data.len().to_ne_bytes()[..]);
+ alloc[s..(s + raw_data.len())].copy_from_slice(raw_data);
+ lru.insert(*digest, alloc);
+ }
+ }
}
impl ReadChunk for RemoteChunkReader {
@@ -72,18 +156,14 @@ impl ReadChunk for RemoteChunkReader {
}
fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error> {
- if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
- return Ok(raw_data.to_vec());
+ if let Some(raw_data) = self.cache_get(digest) {
+ return Ok(raw_data);
}
let chunk = ReadChunk::read_raw_chunk(self, digest)?;
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
- let use_cache = self.cache_hint.contains_key(digest);
- if use_cache {
- (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
- }
+ self.cache_insert(digest, &raw_data);
Ok(raw_data)
}
@@ -102,18 +182,14 @@ impl AsyncReadChunk for RemoteChunkReader {
digest: &'a [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Error>> + Send + 'a>> {
Box::pin(async move {
- if let Some(raw_data) = (*self.cache.lock().unwrap()).get(digest) {
+ if let Some(raw_data) = self.cache_get(digest) {
return Ok(raw_data.to_vec());
}
let chunk = Self::read_raw_chunk(self, digest).await?;
let raw_data = chunk.decode(self.crypt_config.as_ref().map(Arc::as_ref), Some(digest))?;
-
- let use_cache = self.cache_hint.contains_key(digest);
- if use_cache {
- (*self.cache.lock().unwrap()).insert(*digest, raw_data.to_vec());
- }
+ self.cache_insert(digest, &raw_data);
Ok(raw_data)
})
--
2.20.1
^ permalink raw reply [flat|nested] 4+ messages in thread