From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 7E97095EE5 for ; Wed, 28 Feb 2024 15:09:50 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 599C9E1E3 for ; Wed, 28 Feb 2024 15:09:20 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:09:19 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B7A8947A08 for ; Wed, 28 Feb 2024 15:02:54 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:02:23 +0100 Message-Id: <20240228140226.1251979-34-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.105 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:09:50 -0000 Implements the methods to cache entries in a look-ahead cache and flush the entries to archive, either by re-using and injecting the payload chunks from the previous backup snapshot and storing the reference to it, or by re-encoding the chunks. Signed-off-by: Christian Ebner --- pbs-client/src/pxar/create.rs | 284 ++++++++++++++++++++++++++++++++++ 1 file changed, 284 insertions(+) diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 39864483..fbcccb2e 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -828,6 +828,290 @@ impl Archiver { } } + async fn cache_or_flush_entries( + &mut self, + encoder: &mut Encoder<'_, T>, + accessor: &mut Option>>, + c_file_name: &CStr, + stat: &FileStat, + fd: OwnedFd, + metadata: &Metadata, + ) -> Result<(), Error> { + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); + let reusable = if let Some(accessor) = accessor { + self.is_reusable_entry(accessor, file_name, stat, metadata) + .await? + } else { + None + }; + + let file_size = stat.st_size as u64; + if let Some(start_offset) = reusable { + if let Some(ref ref_payload_index) = self.previous_payload_index { + let end_offset = start_offset + file_size; + let (indices, start_padding, _end_padding) = + ref_payload_index.indices(start_offset, end_offset)?; + + //Insert chunks into reused_chunks, getting the correct payload offset + let boundary = encoder.payload_position()?; + let offset = self.reused_chunks.insert(indices, boundary, start_padding); + + if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD { + self.flush_cached_to_archive(encoder, true).await?; + + encoder + .add_payload_ref(metadata, file_name, file_size, offset) + .await?; + + if let Some(ref catalog) = self.catalog { + catalog + .lock() + .unwrap() + .add_file(&c_file_name, file_size, stat.st_mtime)?; + } + } else { + log::debug!("lookahead-cache: {file_name:?}"); + self.caching_enabled = true; + self.cached_payload_size += file_size; + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new( + fd, + c_file_name.into(), + stat.clone(), + metadata.clone(), + offset, + )); + self.cached_entries.push(cache_entry); + } + + return Ok(()); + } + log::debug!("re-encode: {file_name:?} no previous payload index."); + } + + self.flush_cached_to_archive(encoder, false).await?; + self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata) + .await + } + + async fn flush_cached_to_archive( + &mut self, + encoder: &mut Encoder<'_, T>, + reuse_chunks: bool, + ) -> Result<(), Error> { + if reuse_chunks { + self.flush_reused_chunks(encoder)?; + } else { + self.clear_cached_chunks(encoder)?; + } + let entries = std::mem::take(&mut self.cached_entries); + + self.caching_enabled = false; + self.cached_payload_size = 0; + + for entry in entries { + match entry { + CacheEntry::RegEntry(data) => { + self.flush_entry_to_archive(encoder, data, reuse_chunks) + .await? + } + CacheEntry::DirEntry(data) => { + self.flush_directory_to_archive(encoder, data).await? + } + CacheEntry::DirEnd => { + let result = encoder.finish().await?; + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().end_directory()?; + } + result + } + } + } + + Ok(()) + } + + fn flush_reused_chunks( + &mut self, + encoder: &mut Encoder<'_, T>, + ) -> Result<(), Error> { + let mut reused_chunks = std::mem::take(&mut self.reused_chunks); + + let last_chunk = reused_chunks.chunks.pop(); + + // Do not inject directly, but keep around for possible followups + // Needs to be flushed in any case! + let mut injection_boundary = reused_chunks.start_boundary(); + for chunks in reused_chunks.chunks.chunks(128) { + let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size()); + let inject_chunks = InjectChunks { + boundary: injection_boundary.raw(), + chunks: chunks.to_vec(), + size: size as usize, + }; + let mut boundary = self.forced_boundaries.lock().unwrap(); + boundary.push_back(inject_chunks); + injection_boundary = injection_boundary.add(size); + encoder.advance(size)?; + } + + if let Some(chunk) = last_chunk { + let _offset = self.reused_chunks.insert(vec![chunk], injection_boundary, 0); + // Make sure that we flush this chunk even on clear calls + self.reused_chunks.must_flush_first = true; + } + + Ok(()) + } + + fn clear_cached_chunks( + &mut self, + encoder: &mut Encoder<'_, T>, + ) -> Result<(), Error> { + let reused_chunks = std::mem::take(&mut self.reused_chunks); + + if !reused_chunks.must_flush_first { + return Ok(()); + } + + // First chunk was kept back to avoid duplication but needs to be injected + let injection_boundary = reused_chunks.start_boundary(); + if let Some(chunk) = reused_chunks.chunks.first() { + let size = chunk.size(); + let inject_chunks = InjectChunks { + boundary: injection_boundary.raw(), + chunks: vec![chunk.clone()], + size: size as usize, + }; + let mut boundary = self.forced_boundaries.lock().unwrap(); + boundary.push_back(inject_chunks); + encoder.advance(size)?; + } else { + bail!("missing first chunk"); + } + + Ok(()) + } + + async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>( + &'a mut self, + encoder: &'a mut Encoder<'b, T>, + entry_data: CacheEntryData, + ) -> Result<(), Error> { + let CacheEntryData { + c_file_name, + metadata, + .. + } = entry_data; + let dir_name = OsStr::from_bytes(c_file_name.to_bytes()); + + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().start_directory(&c_file_name)?; + } + + encoder.create_directory(dir_name, &metadata).await?; + + Ok(()) + } + + async fn flush_entry_to_archive( + &mut self, + encoder: &mut Encoder<'_, T>, + entry_data: CacheEntryData, + reuse_chunks: bool, + ) -> Result<(), Error> { + use pxar::format::mode; + + let CacheEntryData { + fd, + c_file_name, + stat, + metadata, + payload_offset, + } = entry_data; + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); + + match metadata.file_type() { + mode::IFREG => { + let link_info = HardLinkInfo { + st_dev: stat.st_dev, + st_ino: stat.st_ino, + }; + + if stat.st_nlink > 1 { + if let Some((path, offset)) = self.hardlinks.get(&link_info) { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_hardlink(&c_file_name)?; + } + encoder.add_hardlink(file_name, path, *offset).await?; + return Ok(()); + } + } + + let file_size = stat.st_size as u64; + if let Some(ref catalog) = self.catalog { + catalog + .lock() + .unwrap() + .add_file(&c_file_name, file_size, stat.st_mtime)?; + } + + if reuse_chunks { + encoder + .add_payload_ref(&metadata, file_name, file_size, payload_offset) + .await?; + } else { + let offset: LinkOffset = self + .add_regular_file(encoder, fd, file_name, &metadata, file_size) + .await?; + + if stat.st_nlink > 1 { + self.hardlinks + .insert(link_info, (self.path.clone(), offset)); + } + } + } + mode::IFSOCK => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_socket(&c_file_name)?; + } + encoder.add_socket(&metadata, file_name).await?; + } + mode::IFIFO => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_fifo(&c_file_name)?; + } + encoder.add_fifo(&metadata, file_name).await?; + } + mode::IFLNK => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_symlink(&c_file_name)?; + } + self.add_symlink(encoder, fd, file_name, &metadata).await?; + } + mode::IFBLK => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_block_device(&c_file_name)?; + } + self.add_device(encoder, file_name, &metadata, &stat) + .await?; + } + mode::IFCHR => { + if let Some(ref catalog) = self.catalog { + catalog.lock().unwrap().add_char_device(&c_file_name)?; + } + self.add_device(encoder, file_name, &metadata, &stat) + .await?; + } + other => bail!( + "encountered unknown file type: 0x{:x} (0o{:o})", + other, + other + ), + } + + Ok(()) + } + async fn add_directory( &mut self, encoder: &mut Encoder<'_, T>, -- 2.39.2