From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <c.ebner@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id E23DFBC1E0
 for <pbs-devel@lists.proxmox.com>; Thu, 28 Mar 2024 13:38:16 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id CD397A10D
 for <pbs-devel@lists.proxmox.com>; Thu, 28 Mar 2024 13:37:53 +0100 (CET)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS
 for <pbs-devel@lists.proxmox.com>; Thu, 28 Mar 2024 13:37:50 +0100 (CET)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 8602142947
 for <pbs-devel@lists.proxmox.com>; Thu, 28 Mar 2024 13:37:50 +0100 (CET)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Thu, 28 Mar 2024 13:36:56 +0100
Message-Id: <20240328123707.336951-48-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com>
References: <20240328123707.336951-1-c.ebner@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.029 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add
 look-ahead caching
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Thu, 28 Mar 2024 12:38:16 -0000

Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.

When walking the file system tree, check for each entry if it is
re-usable, meaning that the metadata did not change and the payload
chunks can be re-indexed instead of re-encoding the whole data.
Since the ammount of payload data might be small as compared to the
actual chunk size, a decision whether to re-use or re-encode is
postponed if the reused payload does not fall below a threshold value,
but the chunks where continuous.
In this case, put the entry's file handle an metadata on the cache and
enable caching mode, and continue with the next entry.
Reusable chunk digests and size as well as reference offsets to the
start of regular files payloads within the payload stream are stored in
memory, to be injected for re-usable file entries.

If the threshold value for re-use is reached, the chunks are injected
in the payload stream and the references with the corresponding offsets
encoded in the metadata stream.
If however a non-reusable (because changed) entry is encountered before
the threshold is reached, the entries on the cache are flushed to the
archive by re-encoding them, the memorized chunks and payload reference
offsets are discarted.

Since multiple files might be contained within a single chunk, it is
assured that the deduplication of chunks is performed also when the
reuse threshold is reached, by keeping back the last chunk in the
memorized list, so following files might as well rei-use that chunk.
It is assured that this chunk is however injected in the stream also in
case that the following lookups lead to a cache clear and re-encoding.

Directory boundaries are cached as well, and written as part of the
encoding when flushing.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- completely reworked
- strongly reduced duplicate code

 pbs-client/src/pxar/create.rs | 259 ++++++++++++++++++++++++++++++++++
 1 file changed, 259 insertions(+)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index c64084a74..07fa17ec4 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
 use std::ffi::{CStr, CString, OsStr};
 use std::fmt;
 use std::io::{self, Read};
+use std::mem::size_of;
 use std::ops::Range;
 use std::os::unix::ffi::OsStrExt;
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
@@ -23,6 +24,7 @@ use pxar::accessor::aio::{Accessor, Directory};
 use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
 use pxar::{EntryKind, Metadata};
 
+use proxmox_human_byte::HumanByte;
 use proxmox_io::vec;
 use proxmox_lang::c_str;
 use proxmox_sys::fs::{self, acl, xattr};
@@ -32,6 +34,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
 use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
 
 use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
@@ -274,6 +277,12 @@ struct Archiver {
     reused_chunks: ReusedChunks,
     previous_payload_index: Option<DynamicIndexReader>,
     forced_boundaries: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
+    cached_entries: Vec<CacheEntry>,
+    caching_enabled: bool,
+    total_injected_size: u64,
+    total_injected_count: u64,
+    partial_chunks_count: u64,
+    total_reused_payload_size: u64,
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -377,6 +386,12 @@ where
         reused_chunks: ReusedChunks::new(),
         previous_payload_index,
         forced_boundaries,
+        cached_entries: Vec::new(),
+        caching_enabled: false,
+        total_injected_size: 0,
+        total_injected_count: 0,
+        partial_chunks_count: 0,
+        total_reused_payload_size: 0,
     };
 
     archiver
@@ -879,6 +894,250 @@ impl Archiver {
         }
     }
 
+    async fn cache_or_flush_entries<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        previous_metadata_accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+    ) -> Result<(), Error> {
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        let reusable = if let Some(accessor) = previous_metadata_accessor {
+            self.is_reusable_entry(accessor, file_name, stat, metadata)
+                .await?
+        } else {
+            None
+        };
+
+        let file_size = stat.st_size as u64;
+        if let Some(start_offset) = reusable {
+            if let Some(ref ref_payload_index) = self.previous_payload_index {
+                let payload_size = file_size + size_of::<pxar::format::Header>() as u64;
+                let end_offset = start_offset + payload_size;
+                let (indices, start_padding, end_padding) =
+                    lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?;
+
+                let boundary = encoder.payload_position()?;
+                let offset =
+                    self.reused_chunks
+                        .insert(indices, boundary, start_padding, end_padding);
+
+                self.caching_enabled = true;
+                let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                    fd,
+                    c_file_name.into(),
+                    *stat,
+                    metadata.clone(),
+                    offset,
+                ));
+                self.cached_entries.push(cache_entry);
+
+                match self.reused_chunks.suggested() {
+                    Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?,
+                    Suggested::Reencode => {
+                        self.flush_cached_to_archive(encoder, false, true).await?
+                    }
+                    Suggested::CheckNext => {}
+                }
+
+                return Ok(());
+            }
+        }
+
+        self.flush_cached_to_archive(encoder, false, true).await?;
+        self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat)
+            .await
+    }
+
+    async fn flush_cached_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        reuse_chunks: bool,
+        keep_back_last_chunk: bool,
+    ) -> Result<(), Error> {
+        let entries = std::mem::take(&mut self.cached_entries);
+
+        if !reuse_chunks {
+            self.clear_cached_chunks(encoder)?;
+        }
+
+        for entry in entries {
+            match entry {
+                CacheEntry::RegEntry(CacheEntryData {
+                    fd,
+                    c_file_name,
+                    stat,
+                    metadata,
+                    payload_offset,
+                }) => {
+                    self.add_entry_to_archive(
+                        encoder,
+                        &mut None,
+                        &c_file_name,
+                        &stat,
+                        fd,
+                        &metadata,
+                        reuse_chunks,
+                        Some(payload_offset),
+                    )
+                    .await?
+                }
+                CacheEntry::DirEntry(CacheEntryData {
+                    c_file_name,
+                    metadata,
+                    ..
+                }) => {
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().start_directory(&c_file_name)?;
+                    }
+                    let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+                    encoder.create_directory(dir_name, &metadata).await?;
+                }
+                CacheEntry::DirEnd => {
+                    encoder.finish().await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
+                }
+            }
+        }
+
+        self.caching_enabled = false;
+
+        if reuse_chunks {
+            self.flush_reused_chunks(encoder, keep_back_last_chunk)?;
+        }
+
+        Ok(())
+    }
+
+    fn flush_reused_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        keep_back_last_chunk: bool,
+    ) -> Result<(), Error> {
+        let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        // Do not inject the last reused chunk directly, but keep it as base for further entries
+        // to reduce chunk duplication. Needs to be flushed even on cache clear!
+        let last_chunk = if keep_back_last_chunk {
+            reused_chunks.chunks.pop()
+        } else {
+            None
+        };
+
+        let mut injection_boundary = reused_chunks.start_boundary();
+        let payload_writer_position = encoder.payload_position()?.raw();
+
+        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
+            bail!(
+                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
+                injection_boundary.raw(),
+            );
+        }
+
+        for chunks in reused_chunks.chunks.chunks(128) {
+            let mut chunk_list = Vec::with_capacity(128);
+            let mut size = PayloadOffset::default();
+            for (padding, chunk) in chunks.iter() {
+                log::debug!(
+                    "Injecting chunk with {} padding (chunk size {})",
+                    HumanByte::from(*padding),
+                    HumanByte::from(chunk.size()),
+                );
+                self.total_injected_size += chunk.size();
+                self.total_injected_count += 1;
+                if *padding > 0 {
+                    self.partial_chunks_count += 1;
+                }
+                size = size.add(chunk.size());
+                chunk_list.push(chunk.clone());
+            }
+
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: chunk_list,
+                size: size.raw() as usize,
+            };
+
+            if let Some(boundary) = self.forced_boundaries.as_mut() {
+                let mut boundary = boundary.lock().unwrap();
+                boundary.push_back(inject_chunks);
+            } else {
+                bail!("missing injection queue");
+            };
+
+            injection_boundary = injection_boundary.add(size.raw());
+            encoder.advance(size)?;
+        }
+
+        if let Some((padding, chunk)) = last_chunk {
+            // Make sure that we flush this chunk even on clear calls
+            self.reused_chunks.must_flush_first = true;
+            let _offset = self
+                .reused_chunks
+                .insert(vec![chunk], injection_boundary, padding, 0);
+        }
+
+        Ok(())
+    }
+
+    fn clear_cached_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(), Error> {
+        let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        if !reused_chunks.must_flush_first {
+            return Ok(());
+        }
+
+        // First chunk was kept back to avoid duplication but needs to be injected
+        let injection_boundary = reused_chunks.start_boundary();
+        let payload_writer_position = encoder.payload_position()?.raw();
+
+        if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position {
+            bail!(
+                "encoder payload writer position out of sync: got {payload_writer_position}, expected {}",
+                injection_boundary.raw()
+            );
+        }
+
+        if let Some((padding, chunk)) = reused_chunks.chunks.first() {
+            let size = PayloadOffset::default().add(chunk.size());
+            log::debug!(
+                "Injecting chunk with {} padding (chunk size {})",
+                HumanByte::from(*padding),
+                HumanByte::from(chunk.size()),
+            );
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: vec![chunk.clone()],
+                size: size.raw() as usize,
+            };
+
+            self.total_injected_size += size.raw();
+            self.total_injected_count += 1;
+            if *padding > 0 {
+                self.partial_chunks_count += 1;
+            }
+
+            if let Some(boundary) = self.forced_boundaries.as_mut() {
+                let mut boundary = boundary.lock().unwrap();
+                boundary.push_back(inject_chunks);
+            } else {
+                bail!("missing injection queue");
+            };
+            encoder.advance(size)?;
+        } else {
+            bail!("missing first chunk");
+        }
+
+        Ok(())
+    }
+
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
-- 
2.39.2