From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id B7561BC253 for ; Thu, 28 Mar 2024 13:38:35 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0B78C9F9E for ; Thu, 28 Mar 2024 13:37:47 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 28 Mar 2024 13:37:39 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id C4DF442904 for ; Thu, 28 Mar 2024 13:37:38 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 28 Mar 2024 13:36:16 +0100 Message-Id: <20240328123707.336951-8-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com> References: <20240328123707.336951-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.030 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 pxar 07/58] decoder/accessor: add optional payload input stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 28 Mar 2024 12:38:35 -0000 Implement an optional redirection to read the payload for regular files from a different input stream. This allows to decode split stream archives. Signed-off-by: Christian Ebner --- changes since version 2: - pass the payload input on decoder/accessor instantiation in order to avoid possible adding/removing during decoding/accessing. - major refactoring examples/apxar.rs | 2 +- src/accessor/aio.rs | 10 ++-- src/accessor/mod.rs | 61 ++++++++++++++++++++++--- src/accessor/sync.rs | 8 ++-- src/decoder/aio.rs | 14 ++++-- src/decoder/mod.rs | 106 +++++++++++++++++++++++++++++++++++++++---- src/decoder/sync.rs | 15 ++++-- src/lib.rs | 3 ++ 8 files changed, 184 insertions(+), 35 deletions(-) diff --git a/examples/apxar.rs b/examples/apxar.rs index 0c62242..d5eb04e 100644 --- a/examples/apxar.rs +++ b/examples/apxar.rs @@ -9,7 +9,7 @@ async fn main() { .await .expect("failed to open file"); - let mut reader = Decoder::from_tokio(file) + let mut reader = Decoder::from_tokio(file, None) .await .expect("failed to open pxar archive contents"); diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs index 98d7755..0ebb921 100644 --- a/src/accessor/aio.rs +++ b/src/accessor/aio.rs @@ -39,7 +39,7 @@ impl Accessor> { /// by a blocking file. #[inline] pub async fn from_file_and_size(input: T, size: u64) -> io::Result { - Accessor::new(FileReader::new(input), size).await + Accessor::new(FileReader::new(input), size, None).await } } @@ -75,7 +75,7 @@ where input: T, size: u64, ) -> io::Result>> { - Accessor::new(FileRefReader::new(input), size).await + Accessor::new(FileRefReader::new(input), size, None).await } } @@ -85,9 +85,11 @@ impl Accessor { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub async fn new(input: T, size: u64) -> io::Result { + /// Optionally take the file payloads from the provided input stream rather than the regular + /// pxar stream. + pub async fn new(input: T, size: u64, payload_input: Option) -> io::Result { Ok(Self { - inner: accessor::AccessorImpl::new(input, size).await?, + inner: accessor::AccessorImpl::new(input, size, payload_input).await?, }) } diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs index 6a2de73..4789595 100644 --- a/src/accessor/mod.rs +++ b/src/accessor/mod.rs @@ -182,10 +182,11 @@ pub(crate) struct AccessorImpl { input: T, size: u64, caches: Arc, + payload_input: Option, } impl AccessorImpl { - pub async fn new(input: T, size: u64) -> io::Result { + pub async fn new(input: T, size: u64, payload_input: Option) -> io::Result { if size < (size_of::() as u64) { io_bail!("too small to contain a pxar archive"); } @@ -194,6 +195,7 @@ impl AccessorImpl { input, size, caches: Arc::new(Caches::default()), + payload_input, }) } @@ -207,6 +209,9 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input + .as_ref() + .map(|input| input as &dyn ReadAt), ) .await } @@ -228,7 +233,13 @@ async fn get_decoder( entry_range: Range, path: PathBuf, ) -> io::Result>> { - DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await + DecoderImpl::new_full( + SeqReadAtAdapter::new(input, entry_range.clone()), + path, + true, + None, + ) + .await } // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing! @@ -263,6 +274,7 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -274,6 +286,7 @@ impl AccessorImpl { offset, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -293,17 +306,23 @@ impl AccessorImpl { .next() .await .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??; + Ok(FileEntryImpl { input: self.input.clone(), entry, entry_range_info: entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } /// Allow opening arbitrary contents from a specific range. pub unsafe fn open_contents_at_range(&self, range: Range) -> FileContentsImpl { - FileContentsImpl::new(self.input.clone(), range) + if let Some(payload_input) = &self.payload_input { + FileContentsImpl::new(payload_input.clone(), range) + } else { + FileContentsImpl::new(self.input.clone(), range) + } } /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will @@ -326,9 +345,12 @@ impl AccessorImpl { let link_offset = entry_file_offset - link_offset; - let (mut decoder, entry_offset) = - get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new()) - .await?; + let (mut decoder, entry_offset) = get_decoder_at_filename( + self.input.clone(), + link_offset..self.size, + PathBuf::new(), + ) + .await?; let entry = decoder .next() @@ -342,6 +364,7 @@ impl AccessorImpl { EntryKind::File { offset: Some(offset), size, + .. } => { let meta_size = offset - link_offset; let entry_end = link_offset + meta_size + size; @@ -353,6 +376,7 @@ impl AccessorImpl { entry_range: entry_offset..entry_end, }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } _ => io_bail!("hardlink does not point to a regular file"), @@ -369,6 +393,7 @@ pub(crate) struct DirectoryImpl { table: Arc<[GoodbyeItem]>, path: PathBuf, caches: Arc, + payload_input: Option, } impl DirectoryImpl { @@ -378,6 +403,7 @@ impl DirectoryImpl { end_offset: u64, path: PathBuf, caches: Arc, + payload_input: Option, ) -> io::Result> { let tail = Self::read_tail_entry(&input, end_offset).await?; @@ -407,6 +433,7 @@ impl DirectoryImpl { table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone), path, caches, + payload_input, }; // sanity check: @@ -533,6 +560,7 @@ impl DirectoryImpl { entry_range: self.entry_range(), }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } @@ -685,6 +713,7 @@ pub(crate) struct FileEntryImpl { entry: Entry, entry_range_info: EntryRangeInfo, caches: Arc, + payload_input: Option, } impl FileEntryImpl { @@ -698,6 +727,7 @@ impl FileEntryImpl { self.entry_range_info.entry_range.end, self.entry.path.clone(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -711,14 +741,30 @@ impl FileEntryImpl { EntryKind::File { size, offset: Some(offset), + payload_offset: None, } => Ok(Some(offset..(offset + size))), + // Payload offset beats regular offset if some + EntryKind::File { + size, + offset: Some(_offset), + payload_offset: Some(payload_offset), + } => { + let start_offset = payload_offset + size_of::() as u64; + Ok(Some(start_offset..start_offset + size)) + } _ => Ok(None), } } pub async fn contents(&self) -> io::Result> { match self.content_range()? { - Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)), + Some(range) => { + if let Some(ref payload_input) = self.payload_input { + Ok(FileContentsImpl::new(payload_input.clone(), range)) + } else { + Ok(FileContentsImpl::new(self.input.clone(), range)) + } + } None => io_bail!("not a file"), } } @@ -808,6 +854,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> { entry, entry_range_info: self.entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.dir.payload_input.clone(), }) } diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs index a777152..6150a18 100644 --- a/src/accessor/sync.rs +++ b/src/accessor/sync.rs @@ -31,7 +31,7 @@ impl Accessor> { /// Decode a `pxar` archive from a standard file implementing `FileExt`. #[inline] pub fn from_file_and_size(input: T, size: u64) -> io::Result { - Accessor::new(FileReader::new(input), size) + Accessor::new(FileReader::new(input), size, None) } } @@ -64,7 +64,7 @@ where { /// Open an `Arc` or `Rc` of `File`. pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result>> { - Accessor::new(FileRefReader::new(input), size) + Accessor::new(FileRefReader::new(input), size, None) } } @@ -74,9 +74,9 @@ impl Accessor { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub fn new(input: T, size: u64) -> io::Result { + pub fn new(input: T, size: u64, payload_input: Option) -> io::Result { Ok(Self { - inner: poll_result_once(accessor::AccessorImpl::new(input, size))?, + inner: poll_result_once(accessor::AccessorImpl::new(input, size, payload_input))?, }) } diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs index 4de8c6f..bb032cf 100644 --- a/src/decoder/aio.rs +++ b/src/decoder/aio.rs @@ -20,8 +20,12 @@ pub struct Decoder { impl Decoder> { /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. #[inline] - pub async fn from_tokio(input: T) -> io::Result { - Decoder::new(TokioReader::new(input)).await + pub async fn from_tokio(input: T, payload_input: Option) -> io::Result { + Decoder::new( + TokioReader::new(input), + payload_input.map(|payload_input| TokioReader::new(payload_input)), + ) + .await } } @@ -30,15 +34,15 @@ impl Decoder> { /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. #[inline] pub async fn open>(path: P) -> io::Result { - Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await + Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?, None).await } } impl Decoder { /// Create an async decoder from an input implementing our internal read interface. - pub async fn new(input: T) -> io::Result { + pub async fn new(input: T, payload_input: Option) -> io::Result { Ok(Self { - inner: decoder::DecoderImpl::new(input).await?, + inner: decoder::DecoderImpl::new(input, payload_input).await?, }) } diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs index f439327..8cc4877 100644 --- a/src/decoder/mod.rs +++ b/src/decoder/mod.rs @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl { state: State, with_goodbye_tables: bool, + // Payload of regular files might be provided by a different reader + payload_input: Option, + payload_consumed: u64, + /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF. eof_after_entry: bool, @@ -167,6 +171,8 @@ enum State { Default, InPayload { offset: u64, + size: u64, + payload_ref: bool, }, /// file entries with no data (fifo, socket) @@ -195,8 +201,8 @@ pub(crate) enum ItemResult { } impl DecoderImpl { - pub async fn new(input: I) -> io::Result { - Self::new_full(input, "/".into(), false).await + pub async fn new(input: I, payload_input: Option) -> io::Result { + Self::new_full(input, "/".into(), false, payload_input).await } pub(crate) fn input(&self) -> &I { @@ -207,6 +213,7 @@ impl DecoderImpl { input: I, path: PathBuf, eof_after_entry: bool, + payload_input: Option, ) -> io::Result { let this = DecoderImpl { input, @@ -219,6 +226,8 @@ impl DecoderImpl { path_lengths: Vec::new(), state: State::Begin, with_goodbye_tables: false, + payload_input, + payload_consumed: 0, eof_after_entry, }; @@ -242,9 +251,18 @@ impl DecoderImpl { // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE: self.read_next_item().await?; } - State::InPayload { offset } => { - // We need to skip the current payload first. - self.skip_entry(offset).await?; + State::InPayload { + offset, + payload_ref, + .. + } => { + if payload_ref { + // Update consumed payload as given by the offset referenced by the content reader + self.payload_consumed += offset; + } else if self.payload_input.is_none() { + // Skip remaining payload of current entry in regular stream + self.skip_entry(offset).await?; + } self.read_next_item().await?; } State::InGoodbyeTable => { @@ -308,11 +326,19 @@ impl DecoderImpl { } pub fn content_reader(&mut self) -> Option> { - if let State::InPayload { offset } = &mut self.state { + if let State::InPayload { + offset, + size, + payload_ref, + } = &mut self.state + { + if *payload_ref && self.payload_input.is_none() { + return None; + } Some(Contents::new( - &mut self.input, + self.payload_input.as_mut().unwrap_or(&mut self.input), offset, - self.current_header.content_size(), + *size, )) } else { None @@ -531,8 +557,60 @@ impl DecoderImpl { self.entry.kind = EntryKind::File { size: self.current_header.content_size(), offset, + payload_offset: None, + }; + self.state = State::InPayload { + offset: 0, + size: self.current_header.content_size(), + payload_ref: false, + }; + return Ok(ItemResult::Entry); + } + format::PXAR_PAYLOAD_REF => { + let offset = seq_read_position(&mut self.input).await.transpose()?; + let payload_ref = self.read_payload_ref().await?; + + if let Some(payload_input) = self.payload_input.as_mut() { + if seq_read_position(payload_input) + .await + .transpose()? + .is_none() + { + // Skip payload padding for injected chunks in sequential decoder + let to_skip = payload_ref.offset - self.payload_consumed; + self.skip_payload(to_skip).await?; + } + } + + if let Some(payload_input) = self.payload_input.as_mut() { + let header: u64 = seq_read_entry(payload_input).await?; + if header != format::PXAR_PAYLOAD { + io_bail!( + "unexpected header in payload input: expected {} , got {header}", + format::PXAR_PAYLOAD, + ); + } + let size: u64 = seq_read_entry(payload_input).await?; + self.payload_consumed += size_of::
() as u64; + + if size != payload_ref.size + size_of::
() as u64 { + io_bail!( + "encountered payload size mismatch: got {}, expected {size}", + payload_ref.size + ); + } + } + + self.entry.kind = EntryKind::File { + size: payload_ref.size, + offset, + payload_offset: Some(payload_ref.offset), + }; + self.state = State::InPayload { + offset: 0, + size: payload_ref.size, + payload_ref: true, }; - self.state = State::InPayload { offset: 0 }; return Ok(ItemResult::Entry); } format::PXAR_FILENAME | format::PXAR_GOODBYE => { @@ -567,6 +645,16 @@ impl DecoderImpl { Self::skip(&mut self.input, len).await } + async fn skip_payload(&mut self, length: u64) -> io::Result<()> { + if let Some(payload_input) = self.payload_input.as_mut() { + Self::skip(payload_input, length as usize).await?; + self.payload_consumed += length; + } else { + io_bail!("skip payload called, but got no payload input"); + } + Ok(()) + } + async fn skip(input: &mut I, len: usize) -> io::Result<()> { let mut len = len; let scratch = scratch_buffer(); diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs index 5597a03..caa8bcd 100644 --- a/src/decoder/sync.rs +++ b/src/decoder/sync.rs @@ -25,8 +25,11 @@ pub struct Decoder { impl Decoder> { /// Decode a `pxar` archive from a regular `std::io::Read` input. #[inline] - pub fn from_std(input: T) -> io::Result { - Decoder::new(StandardReader::new(input)) + pub fn from_std(input: T, payload_input: Option) -> io::Result { + Decoder::new( + StandardReader::new(input), + payload_input.map(|payload_input| StandardReader::new(payload_input)), + ) } /// Get a direct reference to the reader contained inside the contained [`StandardReader`]. @@ -38,7 +41,7 @@ impl Decoder> { impl Decoder> { /// Convenience shortcut for `File::open` followed by `Accessor::from_file`. pub fn open>(path: P) -> io::Result { - Self::from_std(std::fs::File::open(path.as_ref())?) + Self::from_std(std::fs::File::open(path.as_ref())?, None) } } @@ -47,9 +50,11 @@ impl Decoder { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub fn new(input: T) -> io::Result { + /// The optional payload input must be used to restore regular file payloads for payload references + /// encountered within the archive. + pub fn new(input: T, payload_input: Option) -> io::Result { Ok(Self { - inner: poll_result_once(decoder::DecoderImpl::new(input))?, + inner: poll_result_once(decoder::DecoderImpl::new(input, payload_input))?, }) } diff --git a/src/lib.rs b/src/lib.rs index 210c4b1..ef81a85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -364,6 +364,9 @@ pub enum EntryKind { /// The file's byte offset inside the archive, if available. offset: Option, + + /// The file's byte offset inside the payload stream, if available. + payload_offset: Option, }, /// Directory entry. When iterating through an archive, the contents follow next. -- 2.39.2