From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <c.ebner@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 7E97095EE5
 for <pbs-devel@lists.proxmox.com>; Wed, 28 Feb 2024 15:09:50 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 599C9E1E3
 for <pbs-devel@lists.proxmox.com>; Wed, 28 Feb 2024 15:09:20 +0100 (CET)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [94.136.29.106])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS
 for <pbs-devel@lists.proxmox.com>; Wed, 28 Feb 2024 15:09:19 +0100 (CET)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B7A8947A08
 for <pbs-devel@lists.proxmox.com>; Wed, 28 Feb 2024 15:02:54 +0100 (CET)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Wed, 28 Feb 2024 15:02:23 +0100
Message-Id: <20240228140226.1251979-34-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com>
References: <20240228140226.1251979-1-c.ebner@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL -0.105 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 POISEN_SPAM_PILL          0.1 Meta: its spam
 POISEN_SPAM_PILL_2        0.1 random spam to be learned in bayes
 POISEN_SPAM_PILL_4        0.1 random spam to be learned in bayes
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 T_SCC_BODY_TEXT_LINE    -0.01 -
Subject: [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead
 caching
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Wed, 28 Feb 2024 14:09:50 -0000

Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-client/src/pxar/create.rs | 284 ++++++++++++++++++++++++++++++++++
 1 file changed, 284 insertions(+)

diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 39864483..fbcccb2e 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -828,6 +828,290 @@ impl Archiver {
         }
     }
 
+    async fn cache_or_flush_entries<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+        c_file_name: &CStr,
+        stat: &FileStat,
+        fd: OwnedFd,
+        metadata: &Metadata,
+    ) -> Result<(), Error> {
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+        let reusable = if let Some(accessor) = accessor {
+            self.is_reusable_entry(accessor, file_name, stat, metadata)
+                .await?
+        } else {
+            None
+        };
+
+        let file_size = stat.st_size as u64;
+        if let Some(start_offset) = reusable {
+            if let Some(ref ref_payload_index) = self.previous_payload_index {
+                let end_offset = start_offset + file_size;
+                let (indices, start_padding, _end_padding) =
+                    ref_payload_index.indices(start_offset, end_offset)?;
+
+                //Insert chunks into reused_chunks, getting the correct payload offset
+                let boundary = encoder.payload_position()?;
+                let offset = self.reused_chunks.insert(indices, boundary, start_padding);
+
+                if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
+                    self.flush_cached_to_archive(encoder, true).await?;
+
+                    encoder
+                        .add_payload_ref(metadata, file_name, file_size, offset)
+                        .await?;
+
+                    if let Some(ref catalog) = self.catalog {
+                        catalog
+                            .lock()
+                            .unwrap()
+                            .add_file(&c_file_name, file_size, stat.st_mtime)?;
+                    }
+                } else {
+                    log::debug!("lookahead-cache: {file_name:?}");
+                    self.caching_enabled = true;
+                    self.cached_payload_size += file_size;
+                    let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+                        fd,
+                        c_file_name.into(),
+                        stat.clone(),
+                        metadata.clone(),
+                        offset,
+                    ));
+                    self.cached_entries.push(cache_entry);
+                }
+
+                return Ok(());
+            }
+            log::debug!("re-encode: {file_name:?} no previous payload index.");
+        }
+
+        self.flush_cached_to_archive(encoder, false).await?;
+        self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+            .await
+    }
+
+    async fn flush_cached_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        reuse_chunks: bool,
+    ) -> Result<(), Error> {
+        if reuse_chunks {
+            self.flush_reused_chunks(encoder)?;
+        } else {
+            self.clear_cached_chunks(encoder)?;
+        }
+        let entries = std::mem::take(&mut self.cached_entries);
+
+        self.caching_enabled = false;
+        self.cached_payload_size = 0;
+
+        for entry in entries {
+            match entry {
+                CacheEntry::RegEntry(data) => {
+                    self.flush_entry_to_archive(encoder, data, reuse_chunks)
+                        .await?
+                }
+                CacheEntry::DirEntry(data) => {
+                    self.flush_directory_to_archive(encoder, data).await?
+                }
+                CacheEntry::DirEnd => {
+                    let result = encoder.finish().await?;
+                    if let Some(ref catalog) = self.catalog {
+                        catalog.lock().unwrap().end_directory()?;
+                    }
+                    result
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    fn flush_reused_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(), Error> {
+        let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        let last_chunk = reused_chunks.chunks.pop();
+
+        // Do not inject directly, but keep around for possible followups
+        // Needs to be flushed in any case!
+        let mut injection_boundary = reused_chunks.start_boundary();
+        for chunks in reused_chunks.chunks.chunks(128) {
+            let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: chunks.to_vec(),
+                size: size as usize,
+            };
+            let mut boundary = self.forced_boundaries.lock().unwrap();
+            boundary.push_back(inject_chunks);
+            injection_boundary = injection_boundary.add(size);
+            encoder.advance(size)?;
+        }
+
+        if let Some(chunk) = last_chunk {
+            let _offset = self.reused_chunks.insert(vec![chunk], injection_boundary, 0);
+            // Make sure that we flush this chunk even on clear calls
+            self.reused_chunks.must_flush_first = true;
+        }
+
+        Ok(())
+    }
+
+    fn clear_cached_chunks<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+    ) -> Result<(), Error> {
+        let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+        if !reused_chunks.must_flush_first {
+            return Ok(());
+        }
+
+        // First chunk was kept back to avoid duplication but needs to be injected
+        let injection_boundary = reused_chunks.start_boundary();
+        if let Some(chunk) = reused_chunks.chunks.first() {
+            let size = chunk.size();
+            let inject_chunks = InjectChunks {
+                boundary: injection_boundary.raw(),
+                chunks: vec![chunk.clone()],
+                size: size as usize,
+            };
+            let mut boundary = self.forced_boundaries.lock().unwrap();
+            boundary.push_back(inject_chunks);
+            encoder.advance(size)?;
+        } else {
+            bail!("missing first chunk");
+        }
+
+        Ok(())
+    }
+
+    async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
+        &'a mut self,
+        encoder: &'a mut Encoder<'b, T>,
+        entry_data: CacheEntryData,
+    ) -> Result<(), Error> {
+        let CacheEntryData {
+            c_file_name,
+            metadata,
+            ..
+        } = entry_data;
+        let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+
+        if let Some(ref catalog) = self.catalog {
+            catalog.lock().unwrap().start_directory(&c_file_name)?;
+        }
+
+        encoder.create_directory(dir_name, &metadata).await?;
+
+        Ok(())
+    }
+
+    async fn flush_entry_to_archive<T: SeqWrite + Send>(
+        &mut self,
+        encoder: &mut Encoder<'_, T>,
+        entry_data: CacheEntryData,
+        reuse_chunks: bool,
+    ) -> Result<(), Error> {
+        use pxar::format::mode;
+
+        let CacheEntryData {
+            fd,
+            c_file_name,
+            stat,
+            metadata,
+            payload_offset,
+        } = entry_data;
+        let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+        match metadata.file_type() {
+            mode::IFREG => {
+                let link_info = HardLinkInfo {
+                    st_dev: stat.st_dev,
+                    st_ino: stat.st_ino,
+                };
+
+                if stat.st_nlink > 1 {
+                    if let Some((path, offset)) = self.hardlinks.get(&link_info) {
+                        if let Some(ref catalog) = self.catalog {
+                            catalog.lock().unwrap().add_hardlink(&c_file_name)?;
+                        }
+                        encoder.add_hardlink(file_name, path, *offset).await?;
+                        return Ok(());
+                    }
+                }
+
+                let file_size = stat.st_size as u64;
+                if let Some(ref catalog) = self.catalog {
+                    catalog
+                        .lock()
+                        .unwrap()
+                        .add_file(&c_file_name, file_size, stat.st_mtime)?;
+                }
+
+                if reuse_chunks {
+                    encoder
+                        .add_payload_ref(&metadata, file_name, file_size, payload_offset)
+                        .await?;
+                } else {
+                    let offset: LinkOffset = self
+                        .add_regular_file(encoder, fd, file_name, &metadata, file_size)
+                        .await?;
+
+                    if stat.st_nlink > 1 {
+                        self.hardlinks
+                            .insert(link_info, (self.path.clone(), offset));
+                    }
+                }
+            }
+            mode::IFSOCK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_socket(&c_file_name)?;
+                }
+                encoder.add_socket(&metadata, file_name).await?;
+            }
+            mode::IFIFO => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_fifo(&c_file_name)?;
+                }
+                encoder.add_fifo(&metadata, file_name).await?;
+            }
+            mode::IFLNK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_symlink(&c_file_name)?;
+                }
+                self.add_symlink(encoder, fd, file_name, &metadata).await?;
+            }
+            mode::IFBLK => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_block_device(&c_file_name)?;
+                }
+                self.add_device(encoder, file_name, &metadata, &stat)
+                    .await?;
+            }
+            mode::IFCHR => {
+                if let Some(ref catalog) = self.catalog {
+                    catalog.lock().unwrap().add_char_device(&c_file_name)?;
+                }
+                self.add_device(encoder, file_name, &metadata, &stat)
+                    .await?;
+            }
+            other => bail!(
+                "encountered unknown file type: 0x{:x} (0o{:o})",
+                other,
+                other
+            ),
+        }
+
+        Ok(())
+    }
+
     async fn add_directory<T: SeqWrite + Send>(
         &mut self,
         encoder: &mut Encoder<'_, T>,
-- 
2.39.2