From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching
Date: Wed, 28 Feb 2024 15:02:23 +0100 [thread overview]
Message-ID: <20240228140226.1251979-34-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com>
Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 284 ++++++++++++++++++++++++++++++++++
1 file changed, 284 insertions(+)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 39864483..fbcccb2e 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -828,6 +828,290 @@ impl Archiver {
}
}
+ async fn cache_or_flush_entries<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ ) -> Result<(), Error> {
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+ let reusable = if let Some(accessor) = accessor {
+ self.is_reusable_entry(accessor, file_name, stat, metadata)
+ .await?
+ } else {
+ None
+ };
+
+ let file_size = stat.st_size as u64;
+ if let Some(start_offset) = reusable {
+ if let Some(ref ref_payload_index) = self.previous_payload_index {
+ let end_offset = start_offset + file_size;
+ let (indices, start_padding, _end_padding) =
+ ref_payload_index.indices(start_offset, end_offset)?;
+
+ //Insert chunks into reused_chunks, getting the correct payload offset
+ let boundary = encoder.payload_position()?;
+ let offset = self.reused_chunks.insert(indices, boundary, start_padding);
+
+ if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
+ self.flush_cached_to_archive(encoder, true).await?;
+
+ encoder
+ .add_payload_ref(metadata, file_name, file_size, offset)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+ } else {
+ log::debug!("lookahead-cache: {file_name:?}");
+ self.caching_enabled = true;
+ self.cached_payload_size += file_size;
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata.clone(),
+ offset,
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+
+ return Ok(());
+ }
+ log::debug!("re-encode: {file_name:?} no previous payload index.");
+ }
+
+ self.flush_cached_to_archive(encoder, false).await?;
+ self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ }
+
+ async fn flush_cached_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ if reuse_chunks {
+ self.flush_reused_chunks(encoder)?;
+ } else {
+ self.clear_cached_chunks(encoder)?;
+ }
+ let entries = std::mem::take(&mut self.cached_entries);
+
+ self.caching_enabled = false;
+ self.cached_payload_size = 0;
+
+ for entry in entries {
+ match entry {
+ CacheEntry::RegEntry(data) => {
+ self.flush_entry_to_archive(encoder, data, reuse_chunks)
+ .await?
+ }
+ CacheEntry::DirEntry(data) => {
+ self.flush_directory_to_archive(encoder, data).await?
+ }
+ CacheEntry::DirEnd => {
+ let result = encoder.finish().await?;
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ result
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn flush_reused_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ let last_chunk = reused_chunks.chunks.pop();
+
+ // Do not inject directly, but keep around for possible followups
+ // Needs to be flushed in any case!
+ let mut injection_boundary = reused_chunks.start_boundary();
+ for chunks in reused_chunks.chunks.chunks(128) {
+ let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: chunks.to_vec(),
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ injection_boundary = injection_boundary.add(size);
+ encoder.advance(size)?;
+ }
+
+ if let Some(chunk) = last_chunk {
+ let _offset = self.reused_chunks.insert(vec![chunk], injection_boundary, 0);
+ // Make sure that we flush this chunk even on clear calls
+ self.reused_chunks.must_flush_first = true;
+ }
+
+ Ok(())
+ }
+
+ fn clear_cached_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ if !reused_chunks.must_flush_first {
+ return Ok(());
+ }
+
+ // First chunk was kept back to avoid duplication but needs to be injected
+ let injection_boundary = reused_chunks.start_boundary();
+ if let Some(chunk) = reused_chunks.chunks.first() {
+ let size = chunk.size();
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: vec![chunk.clone()],
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ encoder.advance(size)?;
+ } else {
+ bail!("missing first chunk");
+ }
+
+ Ok(())
+ }
+
+ async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
+ &'a mut self,
+ encoder: &'a mut Encoder<'b, T>,
+ entry_data: CacheEntryData,
+ ) -> Result<(), Error> {
+ let CacheEntryData {
+ c_file_name,
+ metadata,
+ ..
+ } = entry_data;
+ let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().start_directory(&c_file_name)?;
+ }
+
+ encoder.create_directory(dir_name, &metadata).await?;
+
+ Ok(())
+ }
+
+ async fn flush_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ entry_data: CacheEntryData,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
+ let CacheEntryData {
+ fd,
+ c_file_name,
+ stat,
+ metadata,
+ payload_offset,
+ } = entry_data;
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+ match metadata.file_type() {
+ mode::IFREG => {
+ let link_info = HardLinkInfo {
+ st_dev: stat.st_dev,
+ st_ino: stat.st_ino,
+ };
+
+ if stat.st_nlink > 1 {
+ if let Some((path, offset)) = self.hardlinks.get(&link_info) {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_hardlink(&c_file_name)?;
+ }
+ encoder.add_hardlink(file_name, path, *offset).await?;
+ return Ok(());
+ }
+ }
+
+ let file_size = stat.st_size as u64;
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+
+ if reuse_chunks {
+ encoder
+ .add_payload_ref(&metadata, file_name, file_size, payload_offset)
+ .await?;
+ } else {
+ let offset: LinkOffset = self
+ .add_regular_file(encoder, fd, file_name, &metadata, file_size)
+ .await?;
+
+ if stat.st_nlink > 1 {
+ self.hardlinks
+ .insert(link_info, (self.path.clone(), offset));
+ }
+ }
+ }
+ mode::IFSOCK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_socket(&c_file_name)?;
+ }
+ encoder.add_socket(&metadata, file_name).await?;
+ }
+ mode::IFIFO => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_fifo(&c_file_name)?;
+ }
+ encoder.add_fifo(&metadata, file_name).await?;
+ }
+ mode::IFLNK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_symlink(&c_file_name)?;
+ }
+ self.add_symlink(encoder, fd, file_name, &metadata).await?;
+ }
+ mode::IFBLK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_block_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ mode::IFCHR => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_char_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ other => bail!(
+ "encountered unknown file type: 0x{:x} (0o{:o})",
+ other,
+ other
+ ),
+ }
+
+ Ok(())
+ }
+
async fn add_directory<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
--
2.39.2
next prev parent reply other threads:[~2024-02-28 14:09 UTC|newest]
Thread overview: 39+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
2024-02-28 18:09 ` [pbs-devel] applied: " Thomas Lamprecht
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 05/36] accessor: " Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-12 10:12 ` Dietmar Maurer
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-02-28 14:02 ` Christian Ebner [this message]
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240228140226.1251979-34-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.