From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 77E1C9118F for ; Wed, 3 Apr 2024 12:38:28 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 60F2515DF4 for ; Wed, 3 Apr 2024 12:38:28 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 3 Apr 2024 12:38:26 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 8719344D1B for ; Wed, 3 Apr 2024 12:38:26 +0200 (CEST) Date: Wed, 03 Apr 2024 12:38:17 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240328123707.336951-1-c.ebner@proxmox.com> <20240328123707.336951-8-c.ebner@proxmox.com> In-Reply-To: <20240328123707.336951-8-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1712138653.sp8y5k19rp.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.059 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [sync.rs, apxar.rs, lib.rs, mod.rs, aio.rs] Subject: Re: [pbs-devel] [PATCH v3 pxar 07/58] decoder/accessor: add optional payload input stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 03 Apr 2024 10:38:28 -0000 On March 28, 2024 1:36 pm, Christian Ebner wrote: > Implement an optional redirection to read the payload for regular files > from a different input stream. >=20 > This allows to decode split stream archives. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 2: > - pass the payload input on decoder/accessor instantiation in order to > avoid possible adding/removing during decoding/accessing. > - major refactoring style nit: for those fns that take input and payload_input, it might make sense to order them next to eachother? IMHO it makes the call sites more readable, especially in those cases where it's mostly passed along ;) for the constructors, I am a bit torn on which variant is nicer. >=20 > examples/apxar.rs | 2 +- > src/accessor/aio.rs | 10 ++-- > src/accessor/mod.rs | 61 ++++++++++++++++++++++--- > src/accessor/sync.rs | 8 ++-- > src/decoder/aio.rs | 14 ++++-- > src/decoder/mod.rs | 106 +++++++++++++++++++++++++++++++++++++++---- > src/decoder/sync.rs | 15 ++++-- > src/lib.rs | 3 ++ > 8 files changed, 184 insertions(+), 35 deletions(-) >=20 > diff --git a/examples/apxar.rs b/examples/apxar.rs > index 0c62242..d5eb04e 100644 > --- a/examples/apxar.rs > +++ b/examples/apxar.rs > @@ -9,7 +9,7 @@ async fn main() { > .await > .expect("failed to open file"); > =20 > - let mut reader =3D Decoder::from_tokio(file) > + let mut reader =3D Decoder::from_tokio(file, None) > .await > .expect("failed to open pxar archive contents"); > =20 > diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs > index 98d7755..0ebb921 100644 > --- a/src/accessor/aio.rs > +++ b/src/accessor/aio.rs > @@ -39,7 +39,7 @@ impl Accessor> { > /// by a blocking file. > #[inline] > pub async fn from_file_and_size(input: T, size: u64) -> io::Result { > - Accessor::new(FileReader::new(input), size).await > + Accessor::new(FileReader::new(input), size, None).await > } > } > =20 > @@ -75,7 +75,7 @@ where > input: T, > size: u64, > ) -> io::Result>> { > - Accessor::new(FileRefReader::new(input), size).await > + Accessor::new(FileRefReader::new(input), size, None).await > } > } > =20 > @@ -85,9 +85,11 @@ impl Accessor { > /// > /// Note that the `input`'s `SeqRead` implementation must always ret= urn `Poll::Ready` and is > /// not allowed to use the `Waker`, as this will cause a `panic!`. > - pub async fn new(input: T, size: u64) -> io::Result { > + /// Optionally take the file payloads from the provided input stream= rather than the regular > + /// pxar stream. > + pub async fn new(input: T, size: u64, payload_input: Option) -> i= o::Result { > Ok(Self { > - inner: accessor::AccessorImpl::new(input, size).await?, > + inner: accessor::AccessorImpl::new(input, size, payload_inpu= t).await?, > }) > } > =20 > diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs > index 6a2de73..4789595 100644 > --- a/src/accessor/mod.rs > +++ b/src/accessor/mod.rs > @@ -182,10 +182,11 @@ pub(crate) struct AccessorImpl { > input: T, > size: u64, > caches: Arc, > + payload_input: Option, > } > =20 > impl AccessorImpl { > - pub async fn new(input: T, size: u64) -> io::Result { > + pub async fn new(input: T, size: u64, payload_input: Option) -> i= o::Result { > if size < (size_of::() as u64) { > io_bail!("too small to contain a pxar archive"); > } > @@ -194,6 +195,7 @@ impl AccessorImpl { > input, > size, > caches: Arc::new(Caches::default()), > + payload_input, > }) > } > =20 > @@ -207,6 +209,9 @@ impl AccessorImpl { > self.size, > "/".into(), > Arc::clone(&self.caches), > + self.payload_input > + .as_ref() > + .map(|input| input as &dyn ReadAt), > ) > .await > } > @@ -228,7 +233,13 @@ async fn get_decoder( > entry_range: Range, > path: PathBuf, > ) -> io::Result>> { > - DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), pat= h, true).await > + DecoderImpl::new_full( > + SeqReadAtAdapter::new(input, entry_range.clone()), > + path, > + true, > + None, > + ) > + .await > } > =20 > // NOTE: This performs the Decoder::read_next_item() behavior! Keep in m= ind when changing! > @@ -263,6 +274,7 @@ impl AccessorImpl { > self.size, > "/".into(), > Arc::clone(&self.caches), > + self.payload_input.clone(), > ) > .await > } > @@ -274,6 +286,7 @@ impl AccessorImpl { > offset, > "/".into(), > Arc::clone(&self.caches), > + self.payload_input.clone(), > ) > .await > } > @@ -293,17 +306,23 @@ impl AccessorImpl { > .next() > .await > .ok_or_else(|| io_format_err!("unexpected EOF while decoding= file entry"))??; > + > Ok(FileEntryImpl { > input: self.input.clone(), > entry, > entry_range_info: entry_range_info.clone(), > caches: Arc::clone(&self.caches), > + payload_input: self.payload_input.clone(), > }) > } > =20 > /// Allow opening arbitrary contents from a specific range. > pub unsafe fn open_contents_at_range(&self, range: Range) -> Fi= leContentsImpl { > - FileContentsImpl::new(self.input.clone(), range) > + if let Some(payload_input) =3D &self.payload_input { > + FileContentsImpl::new(payload_input.clone(), range) > + } else { > + FileContentsImpl::new(self.input.clone(), range) > + } > } > =20 > /// Following a hardlink breaks a couple of conventions we otherwise= have, particularly we will > @@ -326,9 +345,12 @@ impl AccessorImpl { > =20 > let link_offset =3D entry_file_offset - link_offset; > =20 > - let (mut decoder, entry_offset) =3D > - get_decoder_at_filename(self.input.clone(), link_offset..sel= f.size, PathBuf::new()) > - .await?; > + let (mut decoder, entry_offset) =3D get_decoder_at_filename( > + self.input.clone(), > + link_offset..self.size, > + PathBuf::new(), > + ) > + .await?; > =20 > let entry =3D decoder > .next() > @@ -342,6 +364,7 @@ impl AccessorImpl { > EntryKind::File { > offset: Some(offset), > size, > + .. > } =3D> { > let meta_size =3D offset - link_offset; > let entry_end =3D link_offset + meta_size + size; > @@ -353,6 +376,7 @@ impl AccessorImpl { > entry_range: entry_offset..entry_end, > }, > caches: Arc::clone(&self.caches), > + payload_input: self.payload_input.clone(), > }) > } > _ =3D> io_bail!("hardlink does not point to a regular file")= , > @@ -369,6 +393,7 @@ pub(crate) struct DirectoryImpl { > table: Arc<[GoodbyeItem]>, > path: PathBuf, > caches: Arc, > + payload_input: Option, > } > =20 > impl DirectoryImpl { > @@ -378,6 +403,7 @@ impl DirectoryImpl { > end_offset: u64, > path: PathBuf, > caches: Arc, > + payload_input: Option, > ) -> io::Result> { > let tail =3D Self::read_tail_entry(&input, end_offset).await?; > =20 > @@ -407,6 +433,7 @@ impl DirectoryImpl { > table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clon= e), > path, > caches, > + payload_input, > }; > =20 > // sanity check: > @@ -533,6 +560,7 @@ impl DirectoryImpl { > entry_range: self.entry_range(), > }, > caches: Arc::clone(&self.caches), > + payload_input: self.payload_input.clone(), > }) > } > =20 > @@ -685,6 +713,7 @@ pub(crate) struct FileEntryImpl { > entry: Entry, > entry_range_info: EntryRangeInfo, > caches: Arc, > + payload_input: Option, > } > =20 > impl FileEntryImpl { > @@ -698,6 +727,7 @@ impl FileEntryImpl { > self.entry_range_info.entry_range.end, > self.entry.path.clone(), > Arc::clone(&self.caches), > + self.payload_input.clone(), > ) > .await > } > @@ -711,14 +741,30 @@ impl FileEntryImpl { > EntryKind::File { > size, > offset: Some(offset), > + payload_offset: None, > } =3D> Ok(Some(offset..(offset + size))), > + // Payload offset beats regular offset if some > + EntryKind::File { > + size, > + offset: Some(_offset), > + payload_offset: Some(payload_offset), > + } =3D> { > + let start_offset =3D payload_offset + size_of::() as u64; > + Ok(Some(start_offset..start_offset + size)) > + } > _ =3D> Ok(None), > } > } > =20 > pub async fn contents(&self) -> io::Result> { > match self.content_range()? { > - Some(range) =3D> Ok(FileContentsImpl::new(self.input.clone()= , range)), > + Some(range) =3D> { > + if let Some(ref payload_input) =3D self.payload_input { > + Ok(FileContentsImpl::new(payload_input.clone(), rang= e)) > + } else { > + Ok(FileContentsImpl::new(self.input.clone(), range)) > + } > + } > None =3D> io_bail!("not a file"), nit: would be easier to parse if it were let range =3D .. if let Some(ref payload_input) =3D .. { .. } else { .. } and it would also mesh better with `open_contents_at_range` above. > } > } > @@ -808,6 +854,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> { > entry, > entry_range_info: self.entry_range_info.clone(), > caches: Arc::clone(&self.caches), > + payload_input: self.dir.payload_input.clone(), > }) > } > =20 > diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs > index a777152..6150a18 100644 > --- a/src/accessor/sync.rs > +++ b/src/accessor/sync.rs > @@ -31,7 +31,7 @@ impl Accessor> { > /// Decode a `pxar` archive from a standard file implementing `FileE= xt`. > #[inline] > pub fn from_file_and_size(input: T, size: u64) -> io::Result { > - Accessor::new(FileReader::new(input), size) > + Accessor::new(FileReader::new(input), size, None) > } > } > =20 > @@ -64,7 +64,7 @@ where > { > /// Open an `Arc` or `Rc` of `File`. > pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result>> { > - Accessor::new(FileRefReader::new(input), size) > + Accessor::new(FileRefReader::new(input), size, None) > } > } > =20 > @@ -74,9 +74,9 @@ impl Accessor { > /// > /// Note that the `input`'s `SeqRead` implementation must always ret= urn `Poll::Ready` and is > /// not allowed to use the `Waker`, as this will cause a `panic!`. > - pub fn new(input: T, size: u64) -> io::Result { > + pub fn new(input: T, size: u64, payload_input: Option) -> io::Res= ult { > Ok(Self { > - inner: poll_result_once(accessor::AccessorImpl::new(input, s= ize))?, > + inner: poll_result_once(accessor::AccessorImpl::new(input, s= ize, payload_input))?, > }) > } > =20 > diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs > index 4de8c6f..bb032cf 100644 > --- a/src/decoder/aio.rs > +++ b/src/decoder/aio.rs > @@ -20,8 +20,12 @@ pub struct Decoder { > impl Decoder> { > /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. > #[inline] > - pub async fn from_tokio(input: T) -> io::Result { > - Decoder::new(TokioReader::new(input)).await > + pub async fn from_tokio(input: T, payload_input: Option) -> io::R= esult { > + Decoder::new( > + TokioReader::new(input), > + payload_input.map(|payload_input| TokioReader::new(payload_i= nput)), > + ) > + .await > } > } > =20 > @@ -30,15 +34,15 @@ impl Decoder> { > /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. > #[inline] > pub async fn open>(path: P) -> io::Result { > - Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?)= .await > + Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?,= None).await > } > } > =20 > impl Decoder { > /// Create an async decoder from an input implementing our internal = read interface. > - pub async fn new(input: T) -> io::Result { > + pub async fn new(input: T, payload_input: Option) -> io::Result { > Ok(Self { > - inner: decoder::DecoderImpl::new(input).await?, > + inner: decoder::DecoderImpl::new(input, payload_input).await= ?, > }) > } > =20 > diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs > index f439327..8cc4877 100644 > --- a/src/decoder/mod.rs > +++ b/src/decoder/mod.rs > @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl { > state: State, > with_goodbye_tables: bool, > =20 > + // Payload of regular files might be provided by a different reader > + payload_input: Option, > + payload_consumed: u64, > + > /// The random access code uses decoders for sub-ranges which may no= t end in a `PAYLOAD` for > /// entries like FIFOs or sockets, so there we explicitly allow an i= tem to terminate with EOF. > eof_after_entry: bool, > @@ -167,6 +171,8 @@ enum State { > Default, > InPayload { > offset: u64, > + size: u64, > + payload_ref: bool, > }, > =20 > /// file entries with no data (fifo, socket) > @@ -195,8 +201,8 @@ pub(crate) enum ItemResult { > } > =20 > impl DecoderImpl { > - pub async fn new(input: I) -> io::Result { > - Self::new_full(input, "/".into(), false).await > + pub async fn new(input: I, payload_input: Option) -> io::Result { > + Self::new_full(input, "/".into(), false, payload_input).await > } > =20 > pub(crate) fn input(&self) -> &I { > @@ -207,6 +213,7 @@ impl DecoderImpl { > input: I, > path: PathBuf, > eof_after_entry: bool, > + payload_input: Option, > ) -> io::Result { > let this =3D DecoderImpl { > input, > @@ -219,6 +226,8 @@ impl DecoderImpl { > path_lengths: Vec::new(), > state: State::Begin, > with_goodbye_tables: false, > + payload_input, > + payload_consumed: 0, > eof_after_entry, > }; > =20 > @@ -242,9 +251,18 @@ impl DecoderImpl { > // hierarchy and parse the next PXAR_FILENAME or the= PXAR_GOODBYE: > self.read_next_item().await?; > } > - State::InPayload { offset } =3D> { > - // We need to skip the current payload first. > - self.skip_entry(offset).await?; > + State::InPayload { > + offset, > + payload_ref, > + .. > + } =3D> { > + if payload_ref { > + // Update consumed payload as given by the offse= t referenced by the content reader > + self.payload_consumed +=3D offset; > + } else if self.payload_input.is_none() { > + // Skip remaining payload of current entry in re= gular stream > + self.skip_entry(offset).await?; > + } I am a bit confused by this here - shouldn't all payloads be encoded via refs now if we have a split archive? and vice versa? why the second condition? and what if I pass a bogus payload input for an archive that doesn't contain any references? > self.read_next_item().await?; > } > State::InGoodbyeTable =3D> { > @@ -308,11 +326,19 @@ impl DecoderImpl { > } > =20 > pub fn content_reader(&mut self) -> Option> { > - if let State::InPayload { offset } =3D &mut self.state { > + if let State::InPayload { > + offset, > + size, > + payload_ref, > + } =3D &mut self.state > + { > + if *payload_ref && self.payload_input.is_none() { > + return None; > + } > Some(Contents::new( > - &mut self.input, > + self.payload_input.as_mut().unwrap_or(&mut self.input), > offset, > - self.current_header.content_size(), > + *size, similar here.. e.g., something like this: let input =3D if *payload_ref { if let Some(payload_input) =3D self.payload_input.as_mut() = { payload_input } else { return None; } } else { &mut self.input }; Some(Contents::new(input, offset, *size)) although technically we do have an invariant there that we could check - we shouldn't encounter a non-ref payload when we have a payload_input.. > )) > } else { > None > @@ -531,8 +557,60 @@ impl DecoderImpl { > self.entry.kind =3D EntryKind::File { > size: self.current_header.content_size(), > offset, > + payload_offset: None, > + }; > + self.state =3D State::InPayload { > + offset: 0, > + size: self.current_header.content_size(), > + payload_ref: false, > + }; > + return Ok(ItemResult::Entry); > + } > + format::PXAR_PAYLOAD_REF =3D> { > + let offset =3D seq_read_position(&mut self.input).await.= transpose()?; > + let payload_ref =3D self.read_payload_ref().await?; > + > + if let Some(payload_input) =3D self.payload_input.as_mut= () { this condition (cted below) > + if seq_read_position(payload_input) > + .await > + .transpose()? > + .is_none() > + { > + // Skip payload padding for injected chunks in s= equential decoder > + let to_skip =3D payload_ref.offset - self.payloa= d_consumed; should we add a check here for the invariant that offsets should only ever be increasing? (and avoid an underflow for corrupt/invalid archives ;)) > + self.skip_payload(to_skip).await?; > + } > + } > + > + if let Some(payload_input) =3D self.payload_input.as_mut= () { and this condition here are the same? > + let header: u64 =3D seq_read_entry(payload_input).aw= ait?; why not read a Header here? > + if header !=3D format::PXAR_PAYLOAD { > + io_bail!( > + "unexpected header in payload input: expecte= d {} , got {header}", > + format::PXAR_PAYLOAD, > + ); > + } > + let size: u64 =3D seq_read_entry(payload_input).awai= t?; > + self.payload_consumed +=3D size_of::
() as u6= 4; > + > + if size !=3D payload_ref.size + size_of::
() = as u64 { > + io_bail!( > + "encountered payload size mismatch: got {}, = expected {size}", > + payload_ref.size > + ); > + } then these could use the size helpers of Header ;) > + } > + > + self.entry.kind =3D EntryKind::File { > + size: payload_ref.size, > + offset, > + payload_offset: Some(payload_ref.offset), > + }; > + self.state =3D State::InPayload { > + offset: 0, > + size: payload_ref.size, > + payload_ref: true, > }; > - self.state =3D State::InPayload { offset: 0 }; > return Ok(ItemResult::Entry); > } > format::PXAR_FILENAME | format::PXAR_GOODBYE =3D> { > [..]