From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 2AC321FF38F for ; Tue, 4 Jun 2024 13:50:47 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B1A4910A62; Tue, 4 Jun 2024 13:51:15 +0200 (CEST) Date: Tue, 04 Jun 2024 13:50:36 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240528094303.309806-1-c.ebner@proxmox.com> <20240528094303.309806-51-c.ebner@proxmox.com> In-Reply-To: <20240528094303.309806-51-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1717499071.pyealxsgg5.astroid@yuna.none> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.058 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [create.rs, proxmox.com] Subject: Re: [pbs-devel] [PATCH v8 proxmox-backup 50/69] fix #3174: client: pxar: enable caching and meta comparison X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On May 28, 2024 11:42 am, Christian Ebner wrote: > When walking the file system tree, check for each entry if it is > reusable, meaning that the metadata did not change and the payload > chunks can be reindexed instead of reencoding the whole data. > > If the metadata matched, the range of the dynamic index entries for > that file are looked up in the previous payload data index. > Use the range and possible padding introduced by partial reuse of > chunks to decide whether to reuse the dynamic entries and encode > the file payloads as payload reference right away or cache the entry > for now and keep looking ahead. > > If however a non-reusable (because changed) entry is encountered > before the padding threshold is reached, the entries on the cache are > flushed to the archive by reencoding them, resetting the cached state. > > Reusable chunk digests and size as well as reference offsets to the > start of regular files payloads within the payload stream are injected > into the backup stream by sending them to the chunker via a dedicated > channel, forcing a chunk boundary and inserting the chunks. > > If the threshold value for reuse is reached, the chunks are injected > in the payload stream and the references with the corresponding > offsets encoded in the metadata stream. > > Since multiple files might be contained within a single chunk, it is > assured that the deduplication of chunks is performed, by keeping back > the last chunk, so following files might as well reuse that same > chunk without double indexing it. It is assured that this chunk is > injected in the stream also in case that the following lookups lead to > a cache clear and reencoding. > > Directory boundaries are cached as well, and written as part of the > encoding when flushing. > > Signed-off-by: Christian Ebner > --- > changes since version 7: > - no changes > > changes since version 6: > - use PxarLookaheadCache and its provided methods > - refactoring removing some unnecessary methods to improve readability > > pbs-client/src/pxar/create.rs | 387 +++++++++++++++++++++++++++++++--- > 1 file changed, 360 insertions(+), 27 deletions(-) > > diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs > index 04c89b453..f044dd1e6 100644 > --- a/pbs-client/src/pxar/create.rs > +++ b/pbs-client/src/pxar/create.rs > @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag}; > use proxmox_sys::error::SysError; > use pxar::accessor::aio::{Accessor, Directory}; > use pxar::accessor::ReadAt; > -use pxar::encoder::{LinkOffset, SeqWrite}; > +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite}; > use pxar::{EntryKind, Metadata, PxarVariant}; > > +use proxmox_human_byte::HumanByte; > use proxmox_io::vec; > use proxmox_lang::c_str; > use proxmox_sys::fs::{self, acl, xattr}; > @@ -33,10 +34,13 @@ use pbs_datastore::dynamic_index::DynamicIndexReader; > use pbs_datastore::index::IndexFile; > > use crate::inject_reused_chunks::InjectChunks; > +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData, PxarLookaheadCache}; > use crate::pxar::metadata::errno_is_unsupported; > use crate::pxar::tools::assert_single_path_component; > use crate::pxar::Flags; > > +const CHUNK_PADDING_THRESHOLD: f64 = 0.1; > + > /// Pxar options for creating a pxar archive/stream > #[derive(Default)] > pub struct PxarCreateOptions { > @@ -154,6 +158,7 @@ struct Archiver { > skip_e2big_xattr: bool, > forced_boundaries: Option>, > previous_payload_index: Option, > + cache: PxarLookaheadCache, > } > > type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; > @@ -207,6 +212,7 @@ where > set.insert(stat.st_dev); > } > > + let metadata_mode = options.previous_ref.is_some() && writers.archive.payload().is_some(); > let mut encoder = Encoder::new(writers.archive, &metadata).await?; > > let mut patterns = options.patterns; > @@ -245,11 +251,19 @@ where > skip_e2big_xattr: options.skip_e2big_xattr, > forced_boundaries, > previous_payload_index, > + cache: PxarLookaheadCache::new(None), > }; > > archiver > .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true) > .await?; > + > + if metadata_mode { > + archiver > + .flush_cached_reusing_if_below_threshold(&mut encoder, false) > + .await?; > + } > + > encoder.finish().await?; > encoder.close().await?; > > @@ -307,7 +321,10 @@ impl Archiver { > for file_entry in file_list { > let file_name = file_entry.name.to_bytes(); > > - if is_root && file_name == b".pxarexclude-cli" { > + if is_root > + && file_name == b".pxarexclude-cli" > + && previous_metadata_accessor.is_none() > + { > self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count) > .await?; > continue; > @@ -610,8 +627,6 @@ impl Archiver { > c_file_name: &CStr, > stat: &FileStat, > ) -> Result<(), Error> { > - use pxar::format::mode; > - > let file_mode = stat.st_mode & libc::S_IFMT; > let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR { > OFlag::empty() > @@ -649,6 +664,127 @@ impl Archiver { > self.skip_e2big_xattr, > )?; > > + if self.previous_payload_index.is_none() { > + return self > + .add_entry_to_archive(encoder, &mut None, c_file_name, stat, fd, &metadata, None) > + .await; > + } > + > + // Avoid having to many open file handles in cached entries > + if self.cache.is_full() { > + log::debug!("Max cache size reached, reuse cached entries"); > + self.flush_cached_reusing_if_below_threshold(encoder, true) > + .await?; > + } > + > + if metadata.is_regular_file() { > + if stat.st_nlink > 1 { > + let link_info = HardLinkInfo { > + st_dev: stat.st_dev, > + st_ino: stat.st_ino, > + }; > + if self.cache.contains_hardlink(&link_info) { > + // This hardlink has been seen by the lookahead cache already, put it on the cache > + // with a dummy offset and continue without lookup and chunk injection. > + // On flushing or re-encoding, the logic there will store the actual hardlink with > + // offset. > + if !self.cache.caching_enabled() { > + // have regular file, get directory path > + let mut path = self.path.clone(); > + path.pop(); > + self.cache.update_start_path(path); > + } > + self.cache.insert( > + fd, > + c_file_name.into(), > + *stat, > + metadata.clone(), > + PayloadOffset::default(), > + ); see comment on patch introducing the cache helper > + return Ok(()); > + } else { > + // mark this hardlink as seen by the lookahead cache > + self.cache.insert_hardlink(link_info); > + } > + } > + > + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); > + if let Some(payload_range) = self > + .is_reusable_entry(previous_metadata, file_name, &metadata) > + .await? > + { > + if !self.cache.try_extend_range(payload_range.clone()) { > + log::debug!("Cache range has hole, new range: {payload_range:?}"); > + self.flush_cached_reusing_if_below_threshold(encoder, true) > + .await?; > + // range has to be set after flushing of cached entries, which resets the range > + self.cache.update_range(payload_range.clone()); > + } > + > + // offset relative to start of current range, does not include possible padding of > + // actual chunks, which needs to be added before encoding the payload reference > + let offset = > + PayloadOffset::default().add(payload_range.start - self.cache.range().start); > + log::debug!("Offset relative to range start: {offset:?}"); > + > + if !self.cache.caching_enabled() { > + // have regular file, get directory path > + let mut path = self.path.clone(); > + path.pop(); > + self.cache.update_start_path(path); > + } > + self.cache > + .insert(fd, c_file_name.into(), *stat, metadata.clone(), offset); see comment on patch introducing the cache helper > + return Ok(()); > + } > + } else if self.cache.caching_enabled() { > + self.cache.insert( > + fd.try_clone()?, > + c_file_name.into(), > + *stat, > + metadata.clone(), > + PayloadOffset::default(), > + ); > + see comment on patch introducing the cache helper > + if metadata.is_dir() { > + self.add_directory( > + encoder, > + previous_metadata, > + Dir::from_fd(fd.into_raw_fd())?, > + c_file_name, > + &metadata, > + stat, > + ) > + .await?; > + } > + return Ok(()); > + } > + > + self.encode_entries_to_archive(encoder, None).await?; > + self.add_entry_to_archive( > + encoder, > + previous_metadata, > + c_file_name, > + stat, > + fd, > + &metadata, > + None, > + ) > + .await > + } > + > + async fn add_entry_to_archive( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + previous_metadata: &mut Option>, > + c_file_name: &CStr, > + stat: &FileStat, > + fd: OwnedFd, > + metadata: &Metadata, > + payload_offset: Option, > + ) -> Result<(), Error> { > + use pxar::format::mode; > + > let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref(); > match metadata.file_type() { > mode::IFREG => { > @@ -677,9 +813,14 @@ impl Archiver { > .add_file(c_file_name, file_size, stat.st_mtime)?; > } > > - let offset: LinkOffset = self > - .add_regular_file(encoder, fd, file_name, &metadata, file_size) > - .await?; > + let offset: LinkOffset = if let Some(payload_offset) = payload_offset { > + encoder > + .add_payload_ref(metadata, file_name, file_size, payload_offset) > + .await? > + } else { > + self.add_regular_file(encoder, fd, file_name, metadata, file_size) > + .await? > + }; > > if stat.st_nlink > 1 { > self.hardlinks > @@ -690,50 +831,43 @@ impl Archiver { > } > mode::IFDIR => { > let dir = Dir::from_fd(fd.into_raw_fd())?; > - self.add_directory( > - encoder, > - previous_metadata, > - dir, > - c_file_name, > - &metadata, > - stat, > - ) > - .await > + self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat) > + .await > } > mode::IFSOCK => { > if let Some(ref catalog) = self.catalog { > catalog.lock().unwrap().add_socket(c_file_name)?; > } > > - Ok(encoder.add_socket(&metadata, file_name).await?) > + Ok(encoder.add_socket(metadata, file_name).await?) > } > mode::IFIFO => { > if let Some(ref catalog) = self.catalog { > catalog.lock().unwrap().add_fifo(c_file_name)?; > } > > - Ok(encoder.add_fifo(&metadata, file_name).await?) > + Ok(encoder.add_fifo(metadata, file_name).await?) > } > mode::IFLNK => { > if let Some(ref catalog) = self.catalog { > catalog.lock().unwrap().add_symlink(c_file_name)?; > } > > - self.add_symlink(encoder, fd, file_name, &metadata).await > + self.add_symlink(encoder, fd, file_name, metadata).await > } > mode::IFBLK => { > if let Some(ref catalog) = self.catalog { > catalog.lock().unwrap().add_block_device(c_file_name)?; > } > > - self.add_device(encoder, file_name, &metadata, stat).await > + self.add_device(encoder, file_name, metadata, stat).await > } > mode::IFCHR => { > if let Some(ref catalog) = self.catalog { > catalog.lock().unwrap().add_char_device(c_file_name)?; > } > > - self.add_device(encoder, file_name, &metadata, stat).await > + self.add_device(encoder, file_name, metadata, stat).await > } > other => bail!( > "encountered unknown file type: 0x{:x} (0o{:o})", > @@ -743,6 +877,199 @@ impl Archiver { > } > } > > + async fn flush_cached_reusing_if_below_threshold( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + keep_last_chunk: bool, > + ) -> Result<(), Error> { > + if self.cache.range().is_empty() { > + // only non regular file entries (e.g. directories) in cache, allows to do regular encoding > + self.encode_entries_to_archive(encoder, None).await?; > + return Ok(()); > + } > + > + // Take ownership of previous last chunk, only update where it must be injected > + let mut prev_last_chunk = self.cache.take_last_chunk(); doesn't need to be mut > + if let Some(ref ref_payload_index) = self.previous_payload_index { and should probably be moved in here, since it doesn't make sense without a previous payload index? > + let (mut indices, start_padding, end_padding) = > + lookup_dynamic_entries(ref_payload_index, self.cache.range().clone())?; if lookup_dynamic_entries would take a &Range > + let mut padding = start_padding + end_padding; > + let range = self.cache.range(); this could be moved up, and the clone above can be dropped > + let total_size = (range.end - range.start) + padding; > + > + // take into account used bytes of kept back chunk for padding > + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) { this should then be first instead of first_mut, and as_ref instead of as_mut > + if last.digest() == first.digest() { > + // Update padding used for threshold calculation only > + let used = last.size() - last.padding; > + padding -= used; > + } > + } > + > + let ratio = padding as f64 / total_size as f64; > + > + // do not reuse chunks if introduced padding higher than threshold > + // opt for re-encoding in that case > + if ratio > CHUNK_PADDING_THRESHOLD { > + log::debug!( > + "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}", > + HumanByte::from(padding), > + HumanByte::from(total_size), > + indices.len(), > + ); > + self.cache.update_last_chunk(prev_last_chunk); > + self.encode_entries_to_archive(encoder, None).await?; > + } else { > + log::debug!( > + "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}", > + HumanByte::from(padding), > + HumanByte::from(total_size), > + indices.len(), > + ); > + > + // check for cases where kept back last is not equal first chunk because the range > + // end aligned with a chunk boundary, and the chunks therefore needs to be injected > + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) { > + if last.digest() != first.digest() { > + // make sure to inject previous last chunk before encoding entries > + self.inject_chunks_at_current_payload_position(encoder, &[last])?; > + } else { > + let used = last.size() - last.padding; > + first.padding -= used; > + } the else is (basically) the same as above, maybe you originally meant to unify them? you could save the digest cmp result as well, and then just have an `if foo { inject }` here? > + } > + > + let base_offset = Some(encoder.payload_position()?.add(start_padding)); > + self.encode_entries_to_archive(encoder, base_offset).await?; > + > + if keep_last_chunk { > + self.cache.update_last_chunk(indices.pop()); > + } > + > + self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?; > + } > + > + Ok(()) > + } else { > + bail!("cannot reuse chunks without previous index reader"); > + } > + } > + > + // Take ownership of cached entries and encode them to the archive > + // Encode with reused payload chunks when base offset is some, reencode otherwise > + async fn encode_entries_to_archive( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + base_offset: Option, > + ) -> Result<(), Error> { > + if let Some(prev) = self.cache.take_last_chunk() { > + // make sure to inject previous last chunk before encoding entries > + self.inject_chunks_at_current_payload_position(encoder, &[prev])?; > + } > + > + let old_path = self.path.clone(); > + self.path = self.cache.start_path().clone(); > + > + // take ownership of cached entries and reset caching state > + let entries = self.cache.take_and_reset(); see comment on patch introducing the cache helper > + log::debug!( > + "Got {} cache entries to encode: reuse is {}", > + entries.len(), > + base_offset.is_some() > + ); > + > + for entry in entries { > + match entry { > + CacheEntry::RegEntry(CacheEntryData { > + fd, > + c_file_name, > + stat, > + metadata, > + payload_offset, > + }) => { > + let file_name = OsStr::from_bytes(c_file_name.to_bytes()); > + self.path.push(file_name); > + self.add_entry_to_archive( > + encoder, > + &mut None, > + &c_file_name, > + &stat, > + fd, > + &metadata, > + base_offset.map(|base_offset| payload_offset.add(base_offset.raw())), > + ) > + .await?; > + self.path.pop(); > + } > + CacheEntry::DirEntry(CacheEntryData { > + c_file_name, > + metadata, > + .. > + }) => { > + let file_name = OsStr::from_bytes(c_file_name.to_bytes()); > + self.path.push(file_name); > + if let Some(ref catalog) = self.catalog { > + catalog.lock().unwrap().start_directory(&c_file_name)?; > + } > + let dir_name = OsStr::from_bytes(c_file_name.to_bytes()); > + encoder.create_directory(dir_name, &metadata).await?; > + } > + CacheEntry::DirEnd => { > + encoder.finish().await?; > + if let Some(ref catalog) = self.catalog { > + catalog.lock().unwrap().end_directory()?; > + } > + self.path.pop(); > + } > + } > + } > + > + self.path = old_path; > + > + Ok(()) > + } > + > + fn inject_chunks_at_current_payload_position( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + reused_chunks: &[ReusableDynamicEntry], this actually consumes the chunks/reusable entries, so it should probably take the Vec, and not a slice? > + ) -> Result<(), Error> { > + let mut injection_boundary = encoder.payload_position()?; > + > + for chunks in reused_chunks.chunks(128) { > + let mut chunk_list = Vec::with_capacity(128); even though we still can't get away without copying here I think(?), but this could be let chunks = chunks.to_vec(); ? it should never be worse than pushing single elements, but in the best case it's better optimized (but maybe I am wrong? ;)) then we could drop the mut > + let mut size = PayloadOffset::default(); > + > + for chunk in chunks.iter() { > + log::debug!( > + "Injecting chunk with {} padding (chunk size {})", > + HumanByte::from(chunk.padding), > + HumanByte::from(chunk.size()), > + ); > + size = size.add(chunk.size()); > + chunk_list.push(chunk.clone()); and drop the push > + } > + > + let inject_chunks = InjectChunks { > + boundary: injection_boundary.raw(), > + chunks: chunk_list, and drop the variable name here although I wonder whether we could have done all this calculation/logging earlier already via lookup_dynamic_entries? might be an optimization for the future, to avoid iterating twice when using to_vec > + size: size.raw() as usize, > + }; > + > + if let Some(sender) = self.forced_boundaries.as_mut() { > + sender.send(inject_chunks)?; > + } else { > + bail!("missing injection queue"); > + }; > + > + injection_boundary = injection_boundary.add(size.raw()); > + log::debug!("Advance payload position by: {size:?}"); > + encoder.advance(size)?; > + } > + > + Ok(()) > + } > + > async fn add_directory( > &mut self, > encoder: &mut Encoder<'_, T>, > @@ -754,10 +1081,12 @@ impl Archiver { > ) -> Result<(), Error> { > let dir_name = OsStr::from_bytes(c_dir_name.to_bytes()); > > - if let Some(ref catalog) = self.catalog { > - catalog.lock().unwrap().start_directory(c_dir_name)?; > + if !self.cache.caching_enabled() { > + if let Some(ref catalog) = self.catalog { > + catalog.lock().unwrap().start_directory(c_dir_name)?; > + } > + encoder.create_directory(dir_name, metadata).await?; > } > - encoder.create_directory(dir_name, metadata).await?; > > let old_fs_magic = self.fs_magic; > let old_fs_feature_flags = self.fs_feature_flags; > @@ -797,9 +1126,13 @@ impl Archiver { > self.fs_feature_flags = old_fs_feature_flags; > self.current_st_dev = old_st_dev; > > - encoder.finish().await?; > - if let Some(ref catalog) = self.catalog { > - catalog.lock().unwrap().end_directory()?; > + if !self.cache.caching_enabled() { > + encoder.finish().await?; > + if let Some(ref catalog) = self.catalog { > + catalog.lock().unwrap().end_directory()?; > + } > + } else { > + self.cache.insert_dir_end(); > } > > result > -- > 2.39.2 > > > > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel > > > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel