From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id D1C70B90F2 for ; Tue, 12 Mar 2024 15:08:58 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B7EDA18F85 for ; Tue, 12 Mar 2024 15:08:58 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Tue, 12 Mar 2024 15:08:57 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 7CA7A43E54 for ; Tue, 12 Mar 2024 15:08:57 +0100 (CET) Date: Tue, 12 Mar 2024 15:08:49 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240305092703.126906-1-c.ebner@proxmox.com> <20240305092703.126906-34-c.ebner@proxmox.com> In-Reply-To: <20240305092703.126906-34-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1710246197.nrcmjqhopa.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.065 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: Re: [pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 12 Mar 2024 14:08:58 -0000 On March 5, 2024 10:27 am, Christian Ebner wrote: > Implements the methods to cache entries in a look-ahead cache and > flush the entries to archive, either by re-using and injecting the > payload chunks from the previous backup snapshot and storing the > reference to it, or by re-encoding the chunks. this is a bit terse for the amount of code below ;) >=20 > Signed-off-by: Christian Ebner > --- > changes since version 1: > - fix flushing of final chunk before archive finish > - fix formatting > - remove unneeded log output >=20 > pbs-client/src/pxar/create.rs | 293 ++++++++++++++++++++++++++++++++++ > 1 file changed, 293 insertions(+) >=20 > diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.r= s > index 3b221b54..b2ce898f 100644 > --- a/pbs-client/src/pxar/create.rs > +++ b/pbs-client/src/pxar/create.rs > @@ -828,6 +828,299 @@ impl Archiver { > } > } > =20 > + async fn cache_or_flush_entries( this is only called if we have a previous payload > + &mut self, > + encoder: &mut Encoder<'_, T>, > + accessor: &mut Option>>, so this must always be set for the call to make any sense, so we can drop the Option<>. > + c_file_name: &CStr, > + stat: &FileStat, > + fd: OwnedFd, > + metadata: &Metadata, > + ) -> Result<(), Error> { > + let file_name: &Path =3D OsStr::from_bytes(c_file_name.to_bytes(= )).as_ref(); > + let reusable =3D if let Some(accessor) =3D accessor { > + self.is_reusable_entry(accessor, file_name, stat, metadata) > + .await? then this part here > + } else { > + None > + }; > + > + let file_size =3D stat.st_size as u64; > + if let Some(start_offset) =3D reusable { can just be inlined here > + if let Some(ref ref_payload_index) =3D self.previous_payload= _index { > + let end_offset =3D start_offset + file_size; > + let (indices, start_padding, _end_padding) =3D > + ref_payload_index.indices(start_offset, end_offset)?= ; already noted at the patch introducing `indices`, this is the only call site, and one of the tuple members is not used.. > + > + let boundary =3D encoder.payload_position()?; > + let offset =3D self.reused_chunks.insert(indices, bounda= ry, start_padding); > + > + if self.cached_payload_size + file_size >=3D CACHED_PAYL= OAD_THRESHOLD { > + self.flush_cached_to_archive(encoder, true, true).aw= ait?; > + this vvvvvv > + encoder > + .add_payload_ref(metadata, file_name, file_size,= offset) > + .await?; > + > + if let Some(ref catalog) =3D self.catalog { > + catalog > + .lock() > + .unwrap() > + .add_file(&c_file_name, file_size, stat.st_m= time)?; > + } ^^^^^^^ also happens in self.flush_cached_to_archive -> self.flush_entry_archive since we pass reuse_chunks, so couldn't we just flush above and then contin= ue with the else branch below to cache this new entry, instead of inline-adding the one bypassing the cache? > + } else { > + self.caching_enabled =3D true; > + self.cached_payload_size +=3D file_size; > + let cache_entry =3D CacheEntry::RegEntry(CacheEntryD= ata::new( > + fd, > + c_file_name.into(), > + stat.clone(), > + metadata.clone(), > + offset, > + )); > + self.cached_entries.push(cache_entry); > + } > + > + return Ok(()); > + } > + } > + > + self.flush_cached_to_archive(encoder, false, true).await?; > + self.add_entry_to_archive(encoder, accessor, c_file_name, stat, = fd, &metadata) > + .await > + } > + > + async fn flush_cached_to_archive( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + reuse_chunks: bool, > + keep_back_last_chunk: bool, > + ) -> Result<(), Error> { > + if reuse_chunks { > + self.flush_reused_chunks(encoder, keep_back_last_chunk)?; > + } else { > + self.clear_cached_chunks(encoder)?; > + } > + let entries =3D std::mem::take(&mut self.cached_entries); > + > + self.caching_enabled =3D false; > + self.cached_payload_size =3D 0; > + > + for entry in entries { > + match entry { > + CacheEntry::RegEntry(data) =3D> { > + self.flush_entry_to_archive(encoder, data, reuse_chu= nks) > + .await? > + } > + CacheEntry::PxarExcludeCliEntry(entry, old_patterns_coun= t) =3D> { > + self.encode_pxarexclude_cli(encoder, &entry.name, ol= d_patterns_count) > + .await?; > + } > + CacheEntry::DirEntry(data) =3D> { > + self.flush_directory_to_archive(encoder, data).await= ? > + } > + CacheEntry::DirEnd =3D> { > + let result =3D encoder.finish().await?; nit: result is `()`, there's no point in keeping it > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().end_directory()?; > + } > + result nor is there in returning it here > + } > + } especially since the whole match result is discarded anyway ;) > + } > + > + Ok(()) > + } > + > + fn flush_reused_chunks( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + keep_back_last_chunk: bool, > + ) -> Result<(), Error> { > + let mut reused_chunks =3D std::mem::take(&mut self.reused_chunks= ); > + > + // Do not inject the last reused chunk directly, but keep it as = base for further entries > + // to reduce chunk duplication. Needs to be flushed even on cach= e clear! > + let last_chunk =3D if keep_back_last_chunk { > + reused_chunks.chunks.pop() > + } else { > + None > + }; > + > + let mut injection_boundary =3D reused_chunks.start_boundary(); > + for chunks in reused_chunks.chunks.chunks(128) { > + let size =3D chunks.iter().fold(0u64, |sum, chunk| sum + chu= nk.size()); > + let inject_chunks =3D InjectChunks { > + boundary: injection_boundary.raw(), > + chunks: chunks.to_vec(), > + size: size as usize, > + }; > + let mut boundary =3D self.forced_boundaries.lock().unwrap(); > + boundary.push_back(inject_chunks); > + injection_boundary =3D injection_boundary.add(size); > + encoder.advance(size)?; > + } > + > + if let Some(chunk) =3D last_chunk { > + let _offset =3D self > + .reused_chunks > + .insert(vec![chunk], injection_boundary, 0); > + // Make sure that we flush this chunk even on clear calls > + self.reused_chunks.must_flush_first =3D true; > + } > + > + Ok(()) > + } > + > + fn clear_cached_chunks( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + ) -> Result<(), Error> { > + let reused_chunks =3D std::mem::take(&mut self.reused_chunks); this might deserve a comment or a more explicit call ;) took me a while to follow a long.. > + > + if !reused_chunks.must_flush_first { > + return Ok(()); > + } > + > + // First chunk was kept back to avoid duplication but needs to b= e injected > + let injection_boundary =3D reused_chunks.start_boundary(); > + if let Some(chunk) =3D reused_chunks.chunks.first() { > + let size =3D chunk.size(); > + let inject_chunks =3D InjectChunks { > + boundary: injection_boundary.raw(), > + chunks: vec![chunk.clone()], > + size: size as usize, > + }; > + let mut boundary =3D self.forced_boundaries.lock().unwrap(); > + boundary.push_back(inject_chunks); > + encoder.advance(size)?; > + } else { > + bail!("missing first chunk"); > + } > + > + Ok(()) > + } > + > + async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>( > + &'a mut self, > + encoder: &'a mut Encoder<'b, T>, > + entry_data: CacheEntryData, > + ) -> Result<(), Error> { > + let CacheEntryData { > + c_file_name, > + metadata, > + .. > + } =3D entry_data; > + let dir_name =3D OsStr::from_bytes(c_file_name.to_bytes()); > + > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().start_directory(&c_file_name)?; > + } > + > + encoder.create_directory(dir_name, &metadata).await?; > + > + Ok(()) > + } > + > + async fn flush_entry_to_archive( > + &mut self, > + encoder: &mut Encoder<'_, T>, > + entry_data: CacheEntryData, > + reuse_chunks: bool, > + ) -> Result<(), Error> { > + use pxar::format::mode; > + > + let CacheEntryData { > + fd, > + c_file_name, > + stat, > + metadata, > + payload_offset, > + } =3D entry_data; > + let file_name: &Path =3D OsStr::from_bytes(c_file_name.to_bytes(= )).as_ref(); starting here this is almost 100% identical to add_entry_to_archive, could we maybe somehow merge them? > + > + match metadata.file_type() { > + mode::IFREG =3D> { > + let link_info =3D HardLinkInfo { > + st_dev: stat.st_dev, > + st_ino: stat.st_ino, > + }; > + > + if stat.st_nlink > 1 { > + if let Some((path, offset)) =3D self.hardlinks.get(&= link_info) { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_hardlink(&c_file= _name)?; > + } > + encoder.add_hardlink(file_name, path, *offset).a= wait?; > + return Ok(()); > + } > + } > + > + let file_size =3D stat.st_size as u64; > + if let Some(ref catalog) =3D self.catalog { > + catalog > + .lock() > + .unwrap() > + .add_file(&c_file_name, file_size, stat.st_mtime= )?; > + } > + > + if reuse_chunks { > + encoder > + .add_payload_ref(&metadata, file_name, file_size= , payload_offset) > + .await?; > + } else { > + let offset: LinkOffset =3D self > + .add_regular_file(encoder, fd, file_name, &metad= ata, file_size) > + .await?; > + > + if stat.st_nlink > 1 { > + self.hardlinks > + .insert(link_info, (self.path.clone(), offse= t)); > + } > + } > + } > + mode::IFSOCK =3D> { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_socket(&c_file_name)?; > + } > + encoder.add_socket(&metadata, file_name).await?; > + } > + mode::IFIFO =3D> { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_fifo(&c_file_name)?; > + } > + encoder.add_fifo(&metadata, file_name).await?; > + } > + mode::IFLNK =3D> { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_symlink(&c_file_name)?; > + } > + self.add_symlink(encoder, fd, file_name, &metadata).awai= t?; > + } > + mode::IFBLK =3D> { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_block_device(&c_file_nam= e)?; > + } > + self.add_device(encoder, file_name, &metadata, &stat) > + .await?; > + } > + mode::IFCHR =3D> { > + if let Some(ref catalog) =3D self.catalog { > + catalog.lock().unwrap().add_char_device(&c_file_name= )?; > + } > + self.add_device(encoder, file_name, &metadata, &stat) > + .await?; > + } > + other =3D> bail!( > + "encountered unknown file type: 0x{:x} (0o{:o})", > + other, > + other > + ), > + } > + > + Ok(()) > + } > + > async fn add_directory( > &mut self, > encoder: &mut Encoder<'_, T>, > --=20 > 2.39.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20 >=20 >=20