From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id E23DFBC1E0 for ; Thu, 28 Mar 2024 13:38:16 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CD397A10D for ; Thu, 28 Mar 2024 13:37:53 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 28 Mar 2024 13:37:50 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 8602142947 for ; Thu, 28 Mar 2024 13:37:50 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 28 Mar 2024 13:36:56 +0100 Message-Id: <20240328123707.336951-48-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com> References: <20240328123707.336951-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.029 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 28 Mar 2024 12:38:16 -0000 Implements the methods to cache entries in a look-ahead cache and flush the entries to archive, either by re-using and injecting the payload chunks from the previous backup snapshot and storing the reference to it, or by re-encoding the chunks. When walking the file system tree, check for each entry if it is re-usable, meaning that the metadata did not change and the payload chunks can be re-indexed instead of re-encoding the whole data. Since the ammount of payload data might be small as compared to the actual chunk size, a decision whether to re-use or re-encode is postponed if the reused payload does not fall below a threshold value, but the chunks where continuous. In this case, put the entry's file handle an metadata on the cache and enable caching mode, and continue with the next entry. Reusable chunk digests and size as well as reference offsets to the start of regular files payloads within the payload stream are stored in memory, to be injected for re-usable file entries. If the threshold value for re-use is reached, the chunks are injected in the payload stream and the references with the corresponding offsets encoded in the metadata stream. If however a non-reusable (because changed) entry is encountered before the threshold is reached, the entries on the cache are flushed to the archive by re-encoding them, the memorized chunks and payload reference offsets are discarted. Since multiple files might be contained within a single chunk, it is assured that the deduplication of chunks is performed also when the reuse threshold is reached, by keeping back the last chunk in the memorized list, so following files might as well rei-use that chunk. It is assured that this chunk is however injected in the stream also in case that the following lookups lead to a cache clear and re-encoding. Directory boundaries are cached as well, and written as part of the encoding when flushing. Signed-off-by: Christian Ebner --- changes since version 2: - completely reworked - strongly reduced duplicate code pbs-client/src/pxar/create.rs | 259 ++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index c64084a74..07fa17ec4 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::ffi::{CStr, CString, OsStr}; use std::fmt; use std::io::{self, Read}; +use std::mem::size_of; use std::ops::Range; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; @@ -23,6 +24,7 @@ use pxar::accessor::aio::{Accessor, Directory}; use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite}; use pxar::{EntryKind, Metadata}; +use proxmox_human_byte::HumanByte; use proxmox_io::vec; use proxmox_lang::c_str; use proxmox_sys::fs::{self, acl, xattr}; @@ -32,6 +34,7 @@ use pbs_datastore::catalog::BackupCatalogWriter; use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt}; use crate::inject_reused_chunks::InjectChunks; +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData}; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; @@ -274,6 +277,12 @@ struct Archiver { reused_chunks: ReusedChunks, previous_payload_index: Option, forced_boundaries: Option>>>, + cached_entries: Vec, + caching_enabled: bool, + total_injected_size: u64, + total_injected_count: u64, + partial_chunks_count: u64, + total_reused_payload_size: u64, } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -377,6 +386,12 @@ where reused_chunks: ReusedChunks::new(), previous_payload_index, forced_boundaries, + cached_entries: Vec::new(), + caching_enabled: false, + total_injected_size: 0, + total_injected_count: 0, + partial_chunks_count: 0, + total_reused_payload_size: 0, }; archiver @@ -879,6 +894,250 @@ impl Archiver { } } + async fn cache_or_flush_entries( + &mut self, + encoder: &mut Encoder<'_, T>, + previous_metadata_accessor: &mut Option>>, + c_file_name: &CStr, + stat: &FileStat, + fd: OwnedFd, + metadata: &Metadata, + ) -> Result<(), Error> { + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); + let reusable = if let Some(accessor) = previous_metadata_accessor { + self.is_reusable_entry(accessor, file_name, stat, metadata) + .await? + } else { + None + }; + + let file_size = stat.st_size as u64; + if let Some(start_offset) = reusable { + if let Some(ref ref_payload_index) = self.previous_payload_index { + let payload_size = file_size + size_of::() as u64; + let end_offset = start_offset + payload_size; + let (indices, start_padding, end_padding) = + lookup_dynamic_entries(ref_payload_index, start_offset..end_offset)?; + + let boundary = encoder.payload_position()?; + let offset = + self.reused_chunks + .insert(indices, boundary, start_padding, end_padding); + + self.caching_enabled = true; + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new( + fd, + c_file_name.into(), + *stat, + metadata.clone(), + offset, + )); + self.cached_entries.push(cache_entry); + + match self.reused_chunks.suggested() { + Suggested::Reuse => self.flush_cached_to_archive(encoder, true, true).await?, + Suggested::Reencode => { + self.flush_cached_to_archive(encoder, false, true).await? + } + Suggested::CheckNext => {} + } + + return Ok(()); + } + } + + self.flush_cached_to_archive(encoder, false, true).await?; + self.add_entry(encoder, previous_metadata_accessor, fd.as_raw_fd(), c_file_name, stat) + .await + } + + async fn flush_cached_to_archive( + &mut self, + encoder: &mut Encoder<'_, T>, + reuse_chunks: bool, + keep_back_last_chunk: bool, + ) -> Result<(), Error> { + let entries = std::mem::take(&mut self.cached_entries); + + if !reuse_chunks { + self.clear_cached_chunks(encoder)?; + } + + for entry in entries { + match entry { + CacheEntry::RegEntry(CacheEntryData { + fd, + c_file_name, + stat, + metadata, + payload_offset, + }) => { + self.add_entry_to_archive( + encoder, + &mut None, + &c_file_name, + &stat, + fd, + &metadata, + reuse_chunks, + Some(payload_offset), + ) + .await? + } + CacheEntry::DirEntry(CacheEntryData { + c_file_name, + metadata, + .. + }) => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(&c_file_name)?; + } + let dir_name = OsStr::from_bytes(c_file_name.to_bytes()); + encoder.create_directory(dir_name, &metadata).await?; + } + CacheEntry::DirEnd => { + encoder.finish().await?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; + } + } + } + } + + self.caching_enabled = false; + + if reuse_chunks { + self.flush_reused_chunks(encoder, keep_back_last_chunk)?; + } + + Ok(()) + } + + fn flush_reused_chunks( + &mut self, + encoder: &mut Encoder<'_, T>, + keep_back_last_chunk: bool, + ) -> Result<(), Error> { + let mut reused_chunks = std::mem::take(&mut self.reused_chunks); + + // Do not inject the last reused chunk directly, but keep it as base for further entries + // to reduce chunk duplication. Needs to be flushed even on cache clear! + let last_chunk = if keep_back_last_chunk { + reused_chunks.chunks.pop() + } else { + None + }; + + let mut injection_boundary = reused_chunks.start_boundary(); + let payload_writer_position = encoder.payload_position()?.raw(); + + if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position { + bail!( + "encoder payload writer position out of sync: got {payload_writer_position}, expected {}", + injection_boundary.raw(), + ); + } + + for chunks in reused_chunks.chunks.chunks(128) { + let mut chunk_list = Vec::with_capacity(128); + let mut size = PayloadOffset::default(); + for (padding, chunk) in chunks.iter() { + log::debug!( + "Injecting chunk with {} padding (chunk size {})", + HumanByte::from(*padding), + HumanByte::from(chunk.size()), + ); + self.total_injected_size += chunk.size(); + self.total_injected_count += 1; + if *padding > 0 { + self.partial_chunks_count += 1; + } + size = size.add(chunk.size()); + chunk_list.push(chunk.clone()); + } + + let inject_chunks = InjectChunks { + boundary: injection_boundary.raw(), + chunks: chunk_list, + size: size.raw() as usize, + }; + + if let Some(boundary) = self.forced_boundaries.as_mut() { + let mut boundary = boundary.lock().unwrap(); + boundary.push_back(inject_chunks); + } else { + bail!("missing injection queue"); + }; + + injection_boundary = injection_boundary.add(size.raw()); + encoder.advance(size)?; + } + + if let Some((padding, chunk)) = last_chunk { + // Make sure that we flush this chunk even on clear calls + self.reused_chunks.must_flush_first = true; + let _offset = self + .reused_chunks + .insert(vec![chunk], injection_boundary, padding, 0); + } + + Ok(()) + } + + fn clear_cached_chunks( + &mut self, + encoder: &mut Encoder<'_, T>, + ) -> Result<(), Error> { + let reused_chunks = std::mem::take(&mut self.reused_chunks); + + if !reused_chunks.must_flush_first { + return Ok(()); + } + + // First chunk was kept back to avoid duplication but needs to be injected + let injection_boundary = reused_chunks.start_boundary(); + let payload_writer_position = encoder.payload_position()?.raw(); + + if !reused_chunks.chunks.is_empty() && injection_boundary.raw() != payload_writer_position { + bail!( + "encoder payload writer position out of sync: got {payload_writer_position}, expected {}", + injection_boundary.raw() + ); + } + + if let Some((padding, chunk)) = reused_chunks.chunks.first() { + let size = PayloadOffset::default().add(chunk.size()); + log::debug!( + "Injecting chunk with {} padding (chunk size {})", + HumanByte::from(*padding), + HumanByte::from(chunk.size()), + ); + let inject_chunks = InjectChunks { + boundary: injection_boundary.raw(), + chunks: vec![chunk.clone()], + size: size.raw() as usize, + }; + + self.total_injected_size += size.raw(); + self.total_injected_count += 1; + if *padding > 0 { + self.partial_chunks_count += 1; + } + + if let Some(boundary) = self.forced_boundaries.as_mut() { + let mut boundary = boundary.lock().unwrap(); + boundary.push_back(inject_chunks); + } else { + bail!("missing injection queue"); + }; + encoder.advance(size)?; + } else { + bail!("missing first chunk"); + } + + Ok(()) + } + async fn add_directory( &mut self, encoder: &mut Encoder<'_, T>, -- 2.39.2