From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 0A7C8BC257 for ; Thu, 28 Mar 2024 13:38:36 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id F10A5A029 for ; Thu, 28 Mar 2024 13:37:47 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 28 Mar 2024 13:37:39 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 0A4DF4291C for ; Thu, 28 Mar 2024 13:37:39 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 28 Mar 2024 13:36:15 +0100 Message-Id: <20240328123707.336951-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com> References: <20240328123707.336951-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 pxar 06/58] encoder: move to stack based state tracking X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 28 Mar 2024 12:38:36 -0000 In preparation for the proxmox-backup-client look-ahead caching, where a passing around of different encoder instances with internal references is not feasible. Instead of creating a new encoder instance for each directory level and keeping references to the parent state, use an internal stack. This is a breaking change in the pxar library API. Signed-off-by: Christian Ebner --- changes since version 2: - consume encoder with new `close` method to finalize - new output_state helper usable when for both a mut borrow is required - double checked use of state/state_mut usage - major refactoring examples/pxarcmd.rs | 7 +- src/encoder/aio.rs | 26 ++-- src/encoder/mod.rs | 285 +++++++++++++++++++++++-------------------- src/encoder/sync.rs | 16 ++- tests/simple/main.rs | 3 + 5 files changed, 187 insertions(+), 150 deletions(-) diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs index e0c779d..0294eba 100644 --- a/examples/pxarcmd.rs +++ b/examples/pxarcmd.rs @@ -106,6 +106,7 @@ fn cmd_create(mut args: std::env::ArgsOs) -> Result<(), Error> { let mut encoder = Encoder::create(file, &meta)?; add_directory(&mut encoder, dir, &dir_path, &mut HashMap::new())?; encoder.finish()?; + encoder.close()?; Ok(()) } @@ -138,14 +139,14 @@ fn add_directory<'a, T: SeqWrite + 'a>( let meta = Metadata::from(&file_meta); if file_type.is_dir() { - let mut dir = encoder.create_directory(file_name, &meta)?; + encoder.create_directory(file_name, &meta)?; add_directory( - &mut dir, + encoder, std::fs::read_dir(file_path)?, root_path, &mut *hardlinks, )?; - dir.finish()?; + encoder.finish()?; } else if file_type.is_symlink() { todo!("symlink handling"); } else if file_type.is_file() { diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs index 31a1a2f..635e550 100644 --- a/src/encoder/aio.rs +++ b/src/encoder/aio.rs @@ -109,20 +109,23 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> { - Ok(Encoder { - inner: self - .inner - .create_directory(file_name.as_ref(), metadata) - .await?, - }) + ) -> io::Result<()> { + self.inner + .create_directory(file_name.as_ref(), metadata) + .await } - /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub async fn finish(self) -> io::Result<()> { + /// Finish this directory. This is mandatory, encodes the end for the current directory. + pub async fn finish(&mut self) -> io::Result<()> { self.inner.finish().await } + /// Close the encoder instance. This is mandatory, encodes the end for the optional payload + /// output stream, if some is given + pub async fn close(self) -> io::Result<()> { + self.inner.close().await + } + /// Add a symbolic link to the archive. pub async fn add_symlink, PT: AsRef>( &mut self, @@ -307,11 +310,12 @@ mod test { .await .unwrap(); { - let mut dir = encoder + encoder .create_directory("baba", &Metadata::dir_builder(0o700).build()) .await .unwrap(); - dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024) + encoder + .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024) .await .unwrap(); } diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs index bff6acf..31bb0fa 100644 --- a/src/encoder/mod.rs +++ b/src/encoder/mod.rs @@ -227,6 +227,16 @@ struct EncoderState { } impl EncoderState { + #[inline] + fn position(&self) -> u64 { + self.write_position + } + + #[inline] + fn payload_position(&self) -> u64 { + self.payload_write_position + } + fn merge_error(&mut self, error: Option) { // one error is enough: if self.encode_error.is_none() { @@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> { Borrowed(&'a mut T), } -impl<'a, T> EncoderOutput<'a, T> { - #[inline] - fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T> - where - 'a: 's, - { - EncoderOutput::Borrowed(self.as_mut()) - } -} - impl<'a, T> std::convert::AsMut for EncoderOutput<'a, T> { fn as_mut(&mut self) -> &mut T { match self { @@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> { pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { output: EncoderOutput<'a, T>, payload_output: EncoderOutput<'a, Option>, - state: EncoderState, - parent: Option<&'a mut EncoderState>, + /// EncoderState stack storing the state for each directory level + state: Vec, finished: bool, /// Since only the "current" entry can be actively writing files, we share the file copy @@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { file_copy_buffer: Arc>>, } -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> { - fn drop(&mut self) { - if let Some(ref mut parent) = self.parent { - // propagate errors: - parent.merge_error(self.state.encode_error); - if !self.finished { - parent.add_error(EncodeError::IncompleteDirectory); - } - } else if !self.finished { - // FIXME: how do we deal with this? - // eprintln!("Encoder dropped without finishing!"); - } - } -} - impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { pub async fn new( output: EncoderOutput<'a, T>, @@ -318,8 +303,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { let mut this = Self { output, payload_output: EncoderOutput::Owned(None), - state: EncoderState::default(), - parent: None, + state: vec![EncoderState::default()], finished: false, file_copy_buffer: Arc::new(Mutex::new(unsafe { crate::util::vec_new_uninitialized(1024 * 1024) @@ -327,7 +311,8 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { }; this.encode_metadata(metadata).await?; - this.state.files_offset = this.position(); + let state = this.state_mut()?; + state.files_offset = state.position(); if let Some(payload_output) = payload_output { this.payload_output = EncoderOutput::Owned(Some(payload_output)); @@ -337,13 +322,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } fn check(&self) -> io::Result<()> { - match self.state.encode_error { + if self.finished { + io_bail!("unexpected encoder finished state"); + } + let state = self.state()?; + match state.encode_error { Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), None => Ok(()), } } + fn state(&self) -> io::Result<&EncoderState> { + self.state + .last() + .ok_or_else(|| io_format_err!("encoder state stack underflow")) + } + + fn state_mut(&mut self) -> io::Result<&mut EncoderState> { + self.state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow")) + } + + fn output_state(&mut self) -> io::Result<(&mut T, &mut EncoderState)> { + Ok(( + self.output.as_mut(), + self.state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?, + )) + } + pub async fn create_file<'b>( &'b mut self, metadata: &Metadata, @@ -368,17 +378,22 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { { self.check()?; - let file_offset = self.position(); + let file_offset = self.state()?.position(); self.start_file_do(Some(metadata), file_name).await?; if let Some(payload_output) = self.payload_output.as_mut() { + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + // Position prior to the payload header - let payload_position = self.state.payload_write_position; + let payload_position = state.payload_position(); // Separate payloads in payload archive PXAR_PAYLOAD markers let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size); header.check_header_size()?; - seq_write_struct(payload_output, header, &mut self.state.payload_write_position).await?; + seq_write_struct(payload_output, header, &mut state.payload_write_position).await?; let payload_ref = PayloadRef { offset: payload_position, @@ -390,16 +405,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { self.output.as_mut(), format::PXAR_PAYLOAD_REF, &payload_ref.data(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } else { let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size); header.check_header_size()?; - seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?; + let (output, state) = self.output_state()?; + seq_write_struct(output, header, &mut state.write_position).await?; } - let payload_data_offset = self.position(); + let state = self + .state + .last_mut() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + let payload_data_offset = state.position(); let meta_size = payload_data_offset - file_offset; @@ -412,7 +432,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { size: file_size + meta_size, }, remaining_size: file_size, - parent: &mut self.state, + parent: state, }) } @@ -493,7 +513,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { target: &Path, target_offset: LinkOffset, ) -> io::Result<()> { - let current_offset = self.position(); + let current_offset = self.state()?.position(); if current_offset <= target_offset.0 { io_bail!("invalid hardlink offset, can only point to prior files"); } @@ -567,24 +587,20 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { ) -> io::Result { self.check()?; - let file_offset = self.position(); + let file_offset = self.state()?.position(); let file_name = file_name.as_os_str().as_bytes(); self.start_file_do(metadata, file_name).await?; + + let (output, state) = self.output_state()?; if let Some((htype, entry_data)) = entry_htype_data { - seq_write_pxar_entry( - self.output.as_mut(), - htype, - entry_data, - &mut self.state.write_position, - ) - .await?; + seq_write_pxar_entry(output, htype, entry_data, &mut state.write_position).await?; } - let end_offset = self.position(); + let end_offset = state.position(); - self.state.items.push(GoodbyeItem { + state.items.push(GoodbyeItem { hash: format::hash_filename(file_name), offset: file_offset, size: end_offset - file_offset, @@ -593,21 +609,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { Ok(LinkOffset(file_offset)) } - #[inline] - fn position(&mut self) -> u64 { - self.state.write_position - } - - #[inline] - fn payload_position(&mut self) -> u64 { - self.state.payload_write_position - } - pub async fn create_directory( &mut self, file_name: &Path, metadata: &Metadata, - ) -> io::Result> { + ) -> io::Result<()> { self.check()?; if !metadata.is_dir() { @@ -617,37 +623,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { let file_name = file_name.as_os_str().as_bytes(); let file_hash = format::hash_filename(file_name); - let file_offset = self.position(); + let file_offset = self.state()?.position(); self.encode_filename(file_name).await?; - let entry_offset = self.position(); + let entry_offset = self.state()?.position(); self.encode_metadata(metadata).await?; - let files_offset = self.position(); + let state = self.state_mut()?; + let files_offset = state.position(); // the child will write to OUR state now: - let write_position = self.position(); - let payload_write_position = self.payload_position(); - - let file_copy_buffer = Arc::clone(&self.file_copy_buffer); - - Ok(EncoderImpl { - // always forward as Borrowed(), to avoid stacking references on nested calls - output: self.output.to_borrowed_mut(), - payload_output: self.payload_output.to_borrowed_mut(), - state: EncoderState { - entry_offset, - files_offset, - file_offset: Some(file_offset), - file_hash, - write_position, - payload_write_position, - ..Default::default() - }, - parent: Some(&mut self.state), - finished: false, - file_copy_buffer, - }) + let write_position = state.position(); + let payload_write_position = state.payload_position(); + + self.state.push(EncoderState { + entry_offset, + files_offset, + file_offset: Some(file_offset), + file_hash, + write_position, + payload_write_position, + ..Default::default() + }); + + Ok(()) } async fn start_file_do( @@ -663,11 +662,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> { + let (output, state) = self.output_state()?; seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ENTRY, metadata.stat.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; @@ -689,72 +689,74 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> { + let (output, state) = self.output_state()?; seq_write_pxar_entry( - self.output.as_mut(), + output, format::PXAR_XATTR, &xattr.data, - &mut self.state.write_position, + &mut state.write_position, ) .await } async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> { + let (output, state) = self.output_state()?; for acl in &acl.users { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_USER, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } for acl in &acl.groups { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_GROUP, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } if let Some(acl) = &acl.group_obj { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_GROUP_OBJ, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } if let Some(acl) = &acl.default { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_DEFAULT, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } for acl in &acl.default_users { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_DEFAULT_USER, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } for acl in &acl.default_groups { seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_ACL_DEFAULT_GROUP, acl.clone(), - &mut self.state.write_position, + &mut state.write_position, ) .await?; } @@ -763,11 +765,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { } async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> { + let (output, state) = self.output_state()?; seq_write_pxar_entry( - self.output.as_mut(), + output, format::PXAR_FCAPS, &fcaps.data, - &mut self.state.write_position, + &mut state.write_position, ) .await } @@ -776,35 +779,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { &mut self, quota_project_id: &format::QuotaProjectId, ) -> io::Result<()> { + let (output, state) = self.output_state()?; seq_write_pxar_struct_entry( - self.output.as_mut(), + output, format::PXAR_QUOTA_PROJID, *quota_project_id, - &mut self.state.write_position, + &mut state.write_position, ) .await } async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> { crate::util::validate_filename(file_name)?; + let (output, state) = self.output_state()?; seq_write_pxar_entry_zero( - self.output.as_mut(), + output, format::PXAR_FILENAME, file_name, - &mut self.state.write_position, + &mut state.write_position, ) .await } - pub async fn finish(mut self) -> io::Result<()> { - let tail_bytes = self.finish_goodbye_table().await?; - seq_write_pxar_entry( - self.output.as_mut(), - format::PXAR_GOODBYE, - &tail_bytes, - &mut self.state.write_position, - ) - .await?; + pub async fn close(mut self) -> io::Result<()> { + if !self.state.is_empty() { + io_bail!("unexpected state on encoder close"); + } if let EncoderOutput::Owned(Some(output)) = &mut self.payload_output { flush(output).await?; @@ -814,34 +814,59 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { flush(output).await?; } - // done up here because of the self-borrow and to propagate - let end_offset = self.position(); - let payload_end_offset = self.payload_position(); + self.finished = true; + + Ok(()) + } + + pub async fn finish(&mut self) -> io::Result<()> { + let tail_bytes = self.finish_goodbye_table().await?; + let mut state = self + .state + .pop() + .ok_or_else(|| io_format_err!("encoder state stack underflow"))?; + seq_write_pxar_entry( + self.output.as_mut(), + format::PXAR_GOODBYE, + &tail_bytes, + &mut state.write_position, + ) + .await?; + + let end_offset = state.position(); + let payload_end_offset = state.payload_position(); - if let Some(parent) = &mut self.parent { + if let Some(parent) = self.state.last_mut() { parent.write_position = end_offset; parent.payload_write_position = payload_end_offset; - let file_offset = self - .state + let file_offset = state .file_offset .expect("internal error: parent set but no file_offset?"); parent.items.push(GoodbyeItem { - hash: self.state.file_hash, + hash: state.file_hash, offset: file_offset, size: end_offset - file_offset, }); + // propagate errors + parent.merge_error(state.encode_error); + Ok(()) + } else { + match state.encode_error { + Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"), + Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"), + None => Ok(()), + } } - self.finished = true; - Ok(()) } async fn finish_goodbye_table(&mut self) -> io::Result> { - let goodbye_offset = self.position(); + let state = self.state_mut()?; + let goodbye_offset = state.position(); // "take" out the tail (to not leave an array of endian-swapped structs in `self`) - let mut tail = take(&mut self.state.items); + let mut tail = take(&mut state.items); let tail_size = (tail.len() + 1) * size_of::(); let goodbye_size = tail_size as u64 + size_of::() as u64; @@ -866,7 +891,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { bst.push( GoodbyeItem { hash: format::PXAR_GOODBYE_TAIL_MARKER, - offset: goodbye_offset - self.state.entry_offset, + offset: goodbye_offset - state.entry_offset, size: goodbye_size, } .to_le(), @@ -896,8 +921,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> { /// exactly zero. remaining_size: u64, - /// The directory containing this file. This is where we propagate the `IncompleteFile` error - /// to, and where we insert our `GoodbyeItem`. + /// The directory stack with the last item being the directory containing this file. This is + /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`. parent: &'a mut EncoderState, } diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs index 96d056d..d0d62ba 100644 --- a/src/encoder/sync.rs +++ b/src/encoder/sync.rs @@ -106,17 +106,21 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { &mut self, file_name: P, metadata: &Metadata, - ) -> io::Result> { - Ok(Encoder { - inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?, - }) + ) -> io::Result<()> { + poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata)) } - /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. - pub fn finish(self) -> io::Result<()> { + /// Finish this directory. This is mandatory, encodes the end for the current directory. + pub fn finish(&mut self) -> io::Result<()> { poll_result_once(self.inner.finish()) } + /// Close the encoder instance. This is mandatory, encodes the end for the optional payload + /// output stream, if some is given + pub fn close(self) -> io::Result<()> { + poll_result_once(self.inner.close()) + } + /// Add a symbolic link to the archive. pub fn add_symlink, PT: AsRef>( &mut self, diff --git a/tests/simple/main.rs b/tests/simple/main.rs index d661c7d..e55457f 100644 --- a/tests/simple/main.rs +++ b/tests/simple/main.rs @@ -51,6 +51,9 @@ fn test1() { encoder .finish() .expect("failed to finish encoding the pxar archive"); + encoder + .close() + .expect("failed to close the encoder instance"); assert!(!file.is_empty(), "encoder did not write any data"); -- 2.39.2