From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 27D5FB8B71 for ; Mon, 11 Mar 2024 14:22:05 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0ABF99554 for ; Mon, 11 Mar 2024 14:21:35 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Mon, 11 Mar 2024 14:21:33 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 3DB04488EE for ; Mon, 11 Mar 2024 14:21:33 +0100 (CET) Date: Mon, 11 Mar 2024 14:21:25 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240305092703.126906-1-c.ebner@proxmox.com> <20240305092703.126906-3-c.ebner@proxmox.com> In-Reply-To: <20240305092703.126906-3-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1710152828.1ncysi2oe3.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: Re: [pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 11 Mar 2024 13:22:05 -0000 On March 5, 2024 10:26 am, Christian Ebner wrote: > During regular pxar archive encoding, the payload of regular files is > written as part of the archive. >=20 > This patch introduces functionality to attach an optional, dedicated > writer instance to redirect the payload to a different output. > The intention for this change is to allow to separate data and metadata > streams in order to allow the reuse of payload data by referencing the > payload writer byte offset, without having to re-encode it. >=20 > Whenever the payload of regular files is redirected to a dedicated > output writer, encode a payload reference header followed by the > required data to locate the data, instead of adding the regular payload > header followed by the encoded payload to the archive. >=20 > This is in preparation for reusing payload chunks for unchanged files > of backups created via the proxmox-backup-client. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 1: > - no changes >=20 > src/encoder/aio.rs | 7 +++++ > src/encoder/mod.rs | 75 +++++++++++++++++++++++++++++++++++++++------ > src/encoder/sync.rs | 7 +++++ > 3 files changed, 79 insertions(+), 10 deletions(-) >=20 > diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs > index ad25fea..82b9ab2 100644 > --- a/src/encoder/aio.rs > +++ b/src/encoder/aio.rs > @@ -52,6 +52,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { > }) > } > =20 > + /// Attach a dedicated writer to redirect the payloads of regular fi= les to a separate output > + pub fn attach_payload_output(self, payload_output: T) -> Self { > + Self { > + inner: self.inner.attach_payload_output(payload_output), see below > + } > + } > + > /// Create a new regular file in the archive. This returns a `File` = object to which the > /// contents have to be written out *completely*. Failing to do so w= ill put the encoder into an > /// error state. > diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs > index 0d342ec..e4ea69b 100644 > --- a/src/encoder/mod.rs > +++ b/src/encoder/mod.rs > @@ -221,6 +221,9 @@ struct EncoderState { > =20 > /// We need to keep track how much we have written to get offsets. > write_position: u64, > + > + /// Track the bytes written to the payload writer > + payload_write_position: u64, > } > =20 > impl EncoderState { > @@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for Encoder= Output<'a, T> { > /// synchronous or `async` I/O objects in as output. > pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> { > output: EncoderOutput<'a, T>, > + payload_output: EncoderOutput<'a, Option>, > state: EncoderState, > parent: Option<&'a mut EncoderState>, > finished: bool, > @@ -312,6 +316,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > } > let mut this =3D Self { > output, > + payload_output: EncoderOutput::Owned(None), > state: EncoderState::default(), > parent: None, > finished: false, > @@ -326,6 +331,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > Ok(this) > } > =20 > + pub fn attach_payload_output(mut self, payload_output: T) -> Self { > + self.payload_output =3D EncoderOutput::Owned(Some(payload_output= )); should we prevent/catch this being called multiple times? > + self > + } > + > fn check(&self) -> io::Result<()> { > match self.state.encode_error { > Some(EncodeError::IncompleteFile) =3D> io_bail!("incomplete = file"), > @@ -361,10 +371,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > let file_offset =3D self.position(); > self.start_file_do(Some(metadata), file_name).await?; > =20 > - let header =3D format::Header::with_content_size(format::PXAR_PA= YLOAD, file_size); > - header.check_header_size()?; > - > - seq_write_struct(self.output.as_mut(), header, &mut self.state.w= rite_position).await?; > + if self.payload_output.as_mut().is_some() { > + let mut data =3D self.payload_position().to_le_bytes().to_ve= c(); > + data.append(&mut file_size.to_le_bytes().to_vec()); > + seq_write_pxar_entry( > + self.output.as_mut(), > + format::PXAR_PAYLOAD_REF, > + &data, > + &mut self.state.write_position, > + ) > + .await?; this part here and the read counter-part in the next commit basically hard-code the format of this entry type, maybe that could be handled nicer? e.g., construct a PayloadRef here, and let that implement the conversion to/from data? it is a pre-existing pattern here though ;) > + } else { > + let header =3D format::Header::with_content_size(format::PXA= R_PAYLOAD, file_size); > + header.check_header_size()?; > + seq_write_struct(self.output.as_mut(), header, &mut self.sta= te.write_position).await?; > + }; > =20 > let payload_data_offset =3D self.position(); > =20 > @@ -372,6 +393,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > =20 > Ok(FileImpl { > output: self.output.as_mut(), > + payload_output: self.payload_output.as_mut().as_mut(), > goodbye_item: GoodbyeItem { > hash: format::hash_filename(file_name), > offset: file_offset, > @@ -564,6 +586,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > self.state.write_position > } > =20 > + #[inline] > + fn payload_position(&mut self) -> u64 { > + self.state.payload_write_position > + } > + > pub async fn create_directory( > &mut self, > file_name: &Path, > @@ -588,18 +615,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > =20 > // the child will write to OUR state now: > let write_position =3D self.position(); > + let payload_write_position =3D self.payload_position(); > =20 > let file_copy_buffer =3D Arc::clone(&self.file_copy_buffer); > =20 > Ok(EncoderImpl { > // always forward as Borrowed(), to avoid stacking reference= s on nested calls > output: self.output.to_borrowed_mut(), > + payload_output: self.payload_output.to_borrowed_mut(), > state: EncoderState { > entry_offset, > files_offset, > file_offset: Some(file_offset), > file_hash, > write_position, > + payload_write_position, > ..Default::default() > }, > parent: Some(&mut self.state), > @@ -764,15 +794,23 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > ) > .await?; > =20 > + if let EncoderOutput::Owned(output) =3D &mut self.payload_output= { > + if let Some(output) =3D output { > + flush(output).await?; > + } > + } nit: the two if-lets could be combined: if let EncoderOutput::Owned(Some(output)) =3D &mut self.payload_output { ..=20 } > + > if let EncoderOutput::Owned(output) =3D &mut self.output { > flush(output).await?; > } > =20 > // done up here because of the self-borrow and to propagate > let end_offset =3D self.position(); > + let payload_end_offset =3D self.payload_position(); > =20 > if let Some(parent) =3D &mut self.parent { > parent.write_position =3D end_offset; > + parent.payload_write_position =3D payload_end_offset; > =20 > let file_offset =3D self > .state > @@ -837,6 +875,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> { > pub(crate) struct FileImpl<'a, S: SeqWrite> { > output: &'a mut S, > =20 > + /// Optional write redirection of file payloads to this sequential s= tream > + payload_output: Option<&'a mut S>, > + > /// This file's `GoodbyeItem`. FIXME: We currently don't touch this,= can we just push it > /// directly instead of on Drop of FileImpl? > goodbye_item: GoodbyeItem, > @@ -916,19 +957,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> { > /// for convenience. > pub async fn write(&mut self, data: &[u8]) -> io::Result { > self.check_remaining(data.len())?; > - let put =3D > - poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).p= oll_seq_write(cx, data) }) > - .await?; > - //let put =3D seq_write(self.output.as_mut().unwrap(), data).awa= it?; > + let put =3D if let Some(mut output) =3D self.payload_output.as_m= ut() { > + let put =3D > + poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).po= ll_seq_write(cx, data) }) > + .await?; > + self.parent.payload_write_position +=3D put as u64; > + put > + } else { > + let put =3D poll_fn(|cx| unsafe { > + Pin::new_unchecked(&mut self.output).poll_seq_write(cx, = data) > + }) > + .await?; > + self.parent.write_position +=3D put as u64; > + put > + }; > + > self.remaining_size -=3D put as u64; > - self.parent.write_position +=3D put as u64; > Ok(put) > } > =20 > /// Completely write file data for the current file entry in a pxar = archive. > pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> { > self.check_remaining(data.len())?; > - seq_write_all(self.output, data, &mut self.parent.write_position= ).await?; > + if let Some(ref mut output) =3D self.payload_output { > + seq_write_all(output, data, &mut self.parent.payload_write_p= osition).await?; > + } else { > + seq_write_all(self.output, data, &mut self.parent.write_posi= tion).await?; > + } > self.remaining_size -=3D data.len() as u64; > Ok(()) > } > diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs > index 1ec91b8..28981df 100644 > --- a/src/encoder/sync.rs > +++ b/src/encoder/sync.rs > @@ -56,6 +56,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { > }) > } > =20 > + /// Attach a dedicated writer to redirect the payloads of regular fi= les to a separate output > + pub fn attach_payload_output(self, payload_output: T) -> Self { > + Self { > + inner: self.inner.attach_payload_output(payload_output), same question as above here.. > + } > + } > + > /// Create a new regular file in the archive. This returns a `File` = object to which the > /// contents have to be written out *completely*. Failing to do so w= ill put the encoder into an > /// error state. > --=20 > 2.39.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20 >=20 >=20