From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 56E2495E7E for ; Wed, 28 Feb 2024 15:03:25 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id A7566D01D for ; Wed, 28 Feb 2024 15:02:54 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:02:50 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id CEBD547A08 for ; Wed, 28 Feb 2024 15:02:48 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:01:56 +0100 Message-Id: <20240228140226.1251979-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.004 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:03:25 -0000 In preparation for the proxmox-backup-client look-ahead caching, where a passing around of different encoder instances with internal references is not feasible. Instead of creating a new encoder instance for each directory level and keeping references to the parent state, use an internal stack. This is a breaking change in the pxar library API. Signed-off-by: Christian Ebner --- examples/pxarcmd.rs | 6 +- src/encoder/aio.rs | 18 ++-- src/encoder/mod.rs | 246 +++++++++++++++++++++++++------------------- src/encoder/sync.rs | 8 +- 4 files changed, 156 insertions(+), 122 deletions(-) diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs index e0c779d..dcf3c44 100644 --- a/examples/pxarcmd.rs +++ b/examples/pxarcmd.rs @@ -138,14 +138,14 @@ fn add_directory<'a, T: SeqWrite + 'a>( let meta = Metadata::from(&file_meta); if file_type.is_dir() { - let mut dir = encoder.create_directory(file_name, &meta)?; + encoder.create_directory(file_name, &meta)?; add_directory( - &mut dir, + encoder, std::fs::read_dir(file_path)?, root_path, &mut *hardlinks, )?; - dir.finish()?; + encoder.finish()?; } else if file_type.is_symlink() { todo!("symlink handling"); } else if file_type.is_file() { diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs index 82b9ab2..7010b8e 100644 --- a/src/encoder/aio.rs +++ b/src/encoder/aio.rs @@ -105,17 +105,14 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> { - Ok(Encoder { - inner: self - .inner - .create_directory(file_name.as_ref(), metadata) - .await?, - }) + ) -> io::Result<()> { + self.inner + .create_directory(file_name.as_ref(), metadata) + .await } /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub async fn finish(self) -> io::Result<()> { + pub async fn finish(&mut self) -> io::Result<()> { self.inner.finish().await } @@ -302,11 +299,12 @@ mod test { .await .unwrap(); { - let mut dir = encoder + encoder .create_directory("baba", &Metadata::dir_builder(0o700).build()) .await .unwrap(); - dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024) + encoder + .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024) .await .unwrap(); } diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs index e4ea69b..962087a 100644 --- a/src/encoder/mod.rs +++ b/src/encoder/mod.rs @@ -227,6 +227,16 @@ struct EncoderState { } impl EncoderState { + #[inline] + fn position(&self) -> u64 { + self.write_position + } + + #[inline] + fn payload_position(&self) -> u64 { + self.payload_write_position + } + fn merge_error(&mut self, error: Option) { // one error is enough: if self.encode_error.is_none() { @@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> { Borrowed(&'a mut T), } -impl<'a, T> EncoderOutput<'a, T> { - #[inline] - fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T> - where - 'a: 's, - { - EncoderOutput::Borrowed(self.as_mut()) - } -} - impl<'a, T> std::convert::AsMut for EncoderOutput<'a, T> { fn as_mut(&mut self) -> &mut T { match self { @@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> { pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { output: EncoderOutput<'a, T>, payload_output: EncoderOutput<'a, Option>, - state: EncoderState, - parent: Option<&'a mut EncoderState>, + /// EncoderState stack storing the state for each directory level + state: Vec, finished: bool, /// Since only the "current" entry can be actively writing files, we share the file copy @@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { file_copy_buffer: Arc>>, } -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { - fn drop(&mut self) { - if let Some(ref mut parent) = self.parent { - // propagate errors: - parent.merge_error(self.state.encode_error); - if !self.finished { - parent.add_error(EncodeError::IncompleteDirectory); - } - } else if !self.finished { - // FIXME: how do we deal with this? - // eprintln!("Encoder dropped without finishing!"); - } - } -} - impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { pub async fn new( output: EncoderOutput<'a, T>, @@ -317,8 +302,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { let mut this = Self { output, payload_output: EncoderOutput::Owned(None), - state: EncoderState::default(), - parent: None, + state: vec![EncoderState::default()], finished: false, file_copy_buffer: Arc::new(Mutex::new(unsafe { crate::util::vec_new_uninitialized(1024 * 1024) @@ -326,7 +310,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { }; this.encode_metadata(metadata).await?; - this.state.files_offset = this.position(); + let state = this + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + state.files_offset = state.position(); Ok(this) } @@ -337,13 +325,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } fn check(&self) -> io::Result<()> { - match self.state.encode_error { + if self.finished { + io_bail!("unexpected encoder finished state"); + } + let state = self + .state + .last() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + match state.encode_error { Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), None => Ok(()), } } + fn state(&self) -> io::Result<&EncoderState> { + self.state + .last() + .ok_or_else(|| io_format_err!("encoder state stack underflow")) + } + + fn state_mut(&mut self) -> io::Result<&mut EncoderState> { + self.state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow")) + } + pub async fn create_file<'b>( &'b mut self, metadata: &Metadata, @@ -368,26 +375,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { { self.check()?; - let file_offset = self.position(); + let file_offset = self.state()?.position(); self.start_file_do(Some(metadata), file_name).await?; if self.payload_output.as_mut().is_some() { - let mut data = self.payload_position().to_le_bytes().to_vec(); + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + let mut data = state.payload_position().to_le_bytes().to_vec(); data.append(&mut file_size.to_le_bytes().to_vec()); seq_write_pxar_entry( self.output.as_mut(), format::PXAR_PAYLOAD_REF, &data, - &mut self.state.write_position, + &mut state.write_position, ) .await?; } else { let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size); header.check_header_size()?; - seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?; + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + seq_write_struct(self.output.as_mut(), header, &mut state.write_position).await?; }; - let payload_data_offset = self.position(); + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + let payload_data_offset = state.position(); let meta_size = payload_data_offset - file_offset; @@ -400,7 +419,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { size: file_size + meta_size, }, remaining_size: file_size, - parent: &mut self.state, + parent: state, }) } @@ -481,7 +500,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { target: &Path, target_offset: LinkOffset, ) -> io::Result<()> { - let current_offset = self.position(); + let current_offset = self.state()?.position(); if current_offset <= target_offset.0 { io_bail!("invalid hardlink offset, can only point to prior files"); } @@ -555,24 +574,29 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { ) -> io::Result { self.check()?; - let file_offset = self.position(); + let file_offset = self.state()?.position(); let file_name = file_name.as_os_str().as_bytes(); self.start_file_do(metadata, file_name).await?; + + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; if let Some((htype, entry_data)) = entry_htype_data { seq_write_pxar_entry( self.output.as_mut(), htype, entry_data, - &mut self.state.write_position, + &mut state.write_position, ) .await?; } - let end_offset = self.position(); + let end_offset = state.position(); - self.state.items.push(GoodbyeItem { + state.items.push(GoodbyeItem { hash: format::hash_filename(file_name), offset: file_offset, size: end_offset - file_offset, @@ -581,21 +605,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { Ok(LinkOffset(file_offset)) } - #[inline] - fn position(&mut self) -> u64 { - self.state.write_position - } - - #[inline] - fn payload_position(&mut self) -> u64 { - self.state.payload_write_position - } - pub async fn create_directory( &mut self, file_name: &Path, metadata: &Metadata, - ) -> io::Result> { + ) -> io::Result<()> { self.check()?; if !metadata.is_dir() { @@ -605,37 +619,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { let file_name = file_name.as_os_str().as_bytes(); let file_hash = format::hash_filename(file_name); - let file_offset = self.position(); + let file_offset = self.state()?.position(); self.encode_filename(file_name).await?; - let entry_offset = self.position(); + let entry_offset = self.state()?.position(); self.encode_metadata(metadata).await?; - let files_offset = self.position(); + let state = self.state_mut()?; + let files_offset = state.position(); // the child will write to OUR state now: - let write_position = self.position(); - let payload_write_position = self.payload_position(); - - let file_copy_buffer = Arc::clone(&self.file_copy_buffer); - - Ok(EncoderImpl { - // always forward as Borrowed(), to avoid stacking references on nested calls - output: self.output.to_borrowed_mut(), - payload_output: self.payload_output.to_borrowed_mut(), - state: EncoderState { - entry_offset, - files_offset, - file_offset: Some(file_offset), - file_hash, - write_position, - payload_write_position, - ..Default::default() - }, - parent: Some(&mut self.state), - finished: false, - file_copy_buffer, - }) + let write_position = state.position(); + let payload_write_position = state.payload_position(); + + self.state.push(EncoderState { + entry_offset, + files_offset, + file_offset: Some(file_offset), + file_hash, + write_position, + payload_write_position, + ..Default::default() + }); + + Ok(()) } async fn start_file_do( @@ -651,11 +658,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_struct_entry( self.output.as_mut(), format::PXAR_ENTRY, metadata.stat.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; @@ -677,22 +688,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_entry( self.output.as_mut(), format::PXAR_XATTR, &xattr.data, - &mut self.state.write_position, + &mut state.write_position, ) .await } async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; for acl in &acl.users { seq_write_pxar_struct_entry( self.output.as_mut(), format::PXAR_ACL_USER, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -702,7 +721,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_ACL_GROUP, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -712,7 +731,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_ACL_GROUP_OBJ, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -722,7 +741,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_ACL_DEFAULT, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -732,7 +751,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_ACL_DEFAULT_USER, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -742,7 +761,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_ACL_DEFAULT_GROUP, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -751,11 +770,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_entry( self.output.as_mut(), format::PXAR_FCAPS, &fcaps.data, - &mut self.state.write_position, + &mut state.write_position, ) .await } @@ -764,33 +787,45 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { &mut self, quota_project_id: &format::QuotaProjectId, ) -> io::Result<()> { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_struct_entry( self.output.as_mut(), format::PXAR_QUOTA_PROJID, *quota_project_id, - &mut self.state.write_position, + &mut state.write_position, ) .await } async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { crate::util::validate_filename(file_name)?; + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_entry_zero( self.output.as_mut(), format::PXAR_FILENAME, file_name, - &mut self.state.write_position, + &mut state.write_position, ) .await } - pub async fn finish(mut self) -> io::Result<()> { + pub async fn finish(&mut self) -> io::Result<()> { let tail_bytes = self.finish_goodbye_table().await?; + let mut state = self + .state + .pop() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; seq_write_pxar_entry( self.output.as_mut(), format::PXAR_GOODBYE, &tail_bytes, - &mut self.state.write_position, + &mut state.write_position, ) .await?; @@ -804,34 +839,37 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { flush(output).await?; } - // done up here because of the self-borrow and to propagate - let end_offset = self.position(); - let payload_end_offset = self.payload_position(); + let end_offset = state.position(); + let payload_end_offset = state.payload_position(); - if let Some(parent) = &mut self.parent { + if let Some(parent) = self.state.last_mut() { parent.write_position = end_offset; parent.payload_write_position = payload_end_offset; - let file_offset = self - .state + let file_offset = state .file_offset .expect("internal error: parent set but no file_offset?"); parent.items.push(GoodbyeItem { - hash: self.state.file_hash, + hash: state.file_hash, offset: file_offset, size: end_offset - file_offset, }); + // propagate errors + parent.merge_error(state.encode_error); + } else { + self.finished = true; } - self.finished = true; + Ok(()) } async fn finish_goodbye_table(&mut self) -> io::Result> { - let goodbye_offset = self.position(); + let state = self.state_mut()?; + let goodbye_offset = state.position(); // "take" out the tail (to not leave an array of endian-swapped structs in `self`) - let mut tail = take(&mut self.state.items); + let mut tail = take(&mut state.items); let tail_size = (tail.len() + 1) * size_of::(); let goodbye_size = tail_size as u64 + size_of::() as u64; @@ -856,7 +894,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { bst.push( GoodbyeItem { hash: format::PXAR_GOODBYE_TAIL_MARKER, - offset: goodbye_offset - self.state.entry_offset, + offset: goodbye_offset - state.entry_offset, size: goodbye_size, } .to_le(), @@ -886,8 +924,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> { /// exactly zero. remaining_size: u64, - /// The directory containing this file. This is where we propagate the `IncompleteFile` error - /// to, and where we insert our `GoodbyeItem`. + /// The directory stack with the last item being the directory containing this file. This is + /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`. parent: &'a mut EncoderState, } diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs index 28981df..de41e25 100644 --- a/src/encoder/sync.rs +++ b/src/encoder/sync.rs @@ -106,14 +106,12 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> { - Ok(Encoder { - inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?, - }) + ) -> io::Result<()> { + poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata)) } /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub fn finish(self) -> io::Result<()> { + pub fn finish(&mut self) -> io::Result<()> { poll_result_once(self.inner.finish()) } -- 2.39.2