From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 24D711FF3A0 for ; Tue, 14 May 2024 12:34:52 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id BA98E3D54; Tue, 14 May 2024 12:34:53 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Tue, 14 May 2024 12:33:22 +0200 Message-Id: <20240514103421.289431-7-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240514103421.289431-1-c.ebner@proxmox.com> References: <20240514103421.289431-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v6 pxar 06/14] decoder/accessor: add optional payload input stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Implement an optional redirection to read the payload for regular files from a different input stream. This allows to decode split stream archives. Signed-off-by: Christian Ebner --- examples/apxar.rs | 2 +- src/accessor/aio.rs | 10 +++-- src/accessor/mod.rs | 78 ++++++++++++++++++++++++++++++------ src/accessor/sync.rs | 8 ++-- src/decoder/aio.rs | 14 ++++--- src/decoder/mod.rs | 94 ++++++++++++++++++++++++++++++++++++++------ src/decoder/sync.rs | 15 ++++--- src/lib.rs | 3 ++ tests/compat.rs | 3 +- tests/simple/main.rs | 5 ++- 10 files changed, 188 insertions(+), 44 deletions(-) diff --git a/examples/apxar.rs b/examples/apxar.rs index 0c62242..d5eb04e 100644 --- a/examples/apxar.rs +++ b/examples/apxar.rs @@ -9,7 +9,7 @@ async fn main() { .await .expect("failed to open file"); - let mut reader = Decoder::from_tokio(file) + let mut reader = Decoder::from_tokio(file, None) .await .expect("failed to open pxar archive contents"); diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs index 98d7755..06167b4 100644 --- a/src/accessor/aio.rs +++ b/src/accessor/aio.rs @@ -39,7 +39,7 @@ impl Accessor> { /// by a blocking file. #[inline] pub async fn from_file_and_size(input: T, size: u64) -> io::Result { - Accessor::new(FileReader::new(input), size).await + Accessor::new(FileReader::new(input), size, None).await } } @@ -75,7 +75,7 @@ where input: T, size: u64, ) -> io::Result>> { - Accessor::new(FileRefReader::new(input), size).await + Accessor::new(FileRefReader::new(input), size, None).await } } @@ -85,9 +85,11 @@ impl Accessor { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub async fn new(input: T, size: u64) -> io::Result { + /// Optionally take the file payloads from the provided input stream rather than the regular + /// pxar stream. + pub async fn new(input: T, size: u64, payload_input: Option<(T, u64)>) -> io::Result { Ok(Self { - inner: accessor::AccessorImpl::new(input, size).await?, + inner: accessor::AccessorImpl::new(input, size, payload_input).await?, }) } diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs index 6a2de73..46afbe3 100644 --- a/src/accessor/mod.rs +++ b/src/accessor/mod.rs @@ -182,10 +182,11 @@ pub(crate) struct AccessorImpl { input: T, size: u64, caches: Arc, + payload_input: Option<(T, Range)>, } impl AccessorImpl { - pub async fn new(input: T, size: u64) -> io::Result { + pub async fn new(input: T, size: u64, payload_input: Option<(T, u64)>) -> io::Result { if size < (size_of::() as u64) { io_bail!("too small to contain a pxar archive"); } @@ -194,6 +195,7 @@ impl AccessorImpl { input, size, caches: Arc::new(Caches::default()), + payload_input: payload_input.map(|(input, size)| (input, 0..size)), }) } @@ -207,6 +209,9 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input + .as_ref() + .map(|(input, range)| (input as &dyn ReadAt, range.clone())), ) .await } @@ -227,8 +232,15 @@ async fn get_decoder( input: T, entry_range: Range, path: PathBuf, + payload_input: Option<(T, Range)>, ) -> io::Result>> { - DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await + DecoderImpl::new_full( + SeqReadAtAdapter::new(input, entry_range.clone()), + path, + true, + payload_input.map(|(input, range)| SeqReadAtAdapter::new(input, range)), + ) + .await } // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing! @@ -236,6 +248,7 @@ async fn get_decoder_at_filename( input: T, entry_range: Range, path: PathBuf, + payload_input: Option<(T, Range)>, ) -> io::Result<(DecoderImpl>, u64)> { // Read the header, it should be a FILENAME, then skip over it and its length: let header: format::Header = read_entry_at(&input, entry_range.start).await?; @@ -251,7 +264,7 @@ async fn get_decoder_at_filename( } Ok(( - get_decoder(input, entry_offset..entry_range.end, path).await?, + get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?, entry_offset, )) } @@ -263,6 +276,7 @@ impl AccessorImpl { self.size, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -274,6 +288,7 @@ impl AccessorImpl { offset, "/".into(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -287,23 +302,30 @@ impl AccessorImpl { self.input.clone(), entry_range_info.entry_range.clone(), PathBuf::new(), + self.payload_input.clone(), ) .await?; let entry = decoder .next() .await .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??; + Ok(FileEntryImpl { input: self.input.clone(), entry, entry_range_info: entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } /// Allow opening arbitrary contents from a specific range. pub unsafe fn open_contents_at_range(&self, range: Range) -> FileContentsImpl { - FileContentsImpl::new(self.input.clone(), range) + if let Some((payload_input, _)) = &self.payload_input { + FileContentsImpl::new(payload_input.clone(), range) + } else { + FileContentsImpl::new(self.input.clone(), range) + } } /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will @@ -326,9 +348,13 @@ impl AccessorImpl { let link_offset = entry_file_offset - link_offset; - let (mut decoder, entry_offset) = - get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new()) - .await?; + let (mut decoder, entry_offset) = get_decoder_at_filename( + self.input.clone(), + link_offset..self.size, + PathBuf::new(), + self.payload_input.clone(), + ) + .await?; let entry = decoder .next() @@ -342,6 +368,7 @@ impl AccessorImpl { EntryKind::File { offset: Some(offset), size, + .. } => { let meta_size = offset - link_offset; let entry_end = link_offset + meta_size + size; @@ -353,6 +380,7 @@ impl AccessorImpl { entry_range: entry_offset..entry_end, }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } _ => io_bail!("hardlink does not point to a regular file"), @@ -369,6 +397,7 @@ pub(crate) struct DirectoryImpl { table: Arc<[GoodbyeItem]>, path: PathBuf, caches: Arc, + payload_input: Option<(T, Range)>, } impl DirectoryImpl { @@ -378,6 +407,7 @@ impl DirectoryImpl { end_offset: u64, path: PathBuf, caches: Arc, + payload_input: Option<(T, Range)>, ) -> io::Result> { let tail = Self::read_tail_entry(&input, end_offset).await?; @@ -407,6 +437,7 @@ impl DirectoryImpl { table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone), path, caches, + payload_input, }; // sanity check: @@ -502,6 +533,7 @@ impl DirectoryImpl { None => self.path.clone(), Some(file) => self.path.join(file), }, + self.payload_input.clone(), ) .await } @@ -533,6 +565,7 @@ impl DirectoryImpl { entry_range: self.entry_range(), }, caches: Arc::clone(&self.caches), + payload_input: self.payload_input.clone(), }) } @@ -575,6 +608,10 @@ impl DirectoryImpl { cur = next; } + if let Some(cur) = cur.as_mut() { + cur.payload_input = self.payload_input.clone(); + } + Ok(cur) } @@ -599,7 +636,9 @@ impl DirectoryImpl { let cursor = self.get_cursor(index).await?; if cursor.file_name == path { - return Ok(Some(cursor.decode_entry().await?)); + let mut entry = cursor.decode_entry().await?; + entry.payload_input = self.payload_input.clone(); + return Ok(Some(entry)); } dup += 1; @@ -685,6 +724,7 @@ pub(crate) struct FileEntryImpl { entry: Entry, entry_range_info: EntryRangeInfo, caches: Arc, + payload_input: Option<(T, Range)>, } impl FileEntryImpl { @@ -698,6 +738,7 @@ impl FileEntryImpl { self.entry_range_info.entry_range.end, self.entry.path.clone(), Arc::clone(&self.caches), + self.payload_input.clone(), ) .await } @@ -711,15 +752,29 @@ impl FileEntryImpl { EntryKind::File { size, offset: Some(offset), + payload_offset: None, } => Ok(Some(offset..(offset + size))), + // Payload offset beats regular offset if some + EntryKind::File { + size, + offset: Some(_offset), + payload_offset: Some(payload_offset), + } => { + let start_offset = payload_offset + size_of::() as u64; + Ok(Some(start_offset..start_offset + size)) + } _ => Ok(None), } } pub async fn contents(&self) -> io::Result> { - match self.content_range()? { - Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)), - None => io_bail!("not a file"), + let range = self + .content_range()? + .ok_or_else(|| io_format_err!("not a file"))?; + if let Some((ref payload_input, _)) = self.payload_input { + Ok(FileContentsImpl::new(payload_input.clone(), range)) + } else { + Ok(FileContentsImpl::new(self.input.clone(), range)) } } @@ -808,6 +863,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> { entry, entry_range_info: self.entry_range_info.clone(), caches: Arc::clone(&self.caches), + payload_input: self.dir.payload_input.clone(), }) } diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs index a777152..cd8dff0 100644 --- a/src/accessor/sync.rs +++ b/src/accessor/sync.rs @@ -31,7 +31,7 @@ impl Accessor> { /// Decode a `pxar` archive from a standard file implementing `FileExt`. #[inline] pub fn from_file_and_size(input: T, size: u64) -> io::Result { - Accessor::new(FileReader::new(input), size) + Accessor::new(FileReader::new(input), size, None) } } @@ -64,7 +64,7 @@ where { /// Open an `Arc` or `Rc` of `File`. pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result>> { - Accessor::new(FileRefReader::new(input), size) + Accessor::new(FileRefReader::new(input), size, None) } } @@ -74,9 +74,9 @@ impl Accessor { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub fn new(input: T, size: u64) -> io::Result { + pub fn new(input: T, size: u64, payload_input: Option<(T, u64)>) -> io::Result { Ok(Self { - inner: poll_result_once(accessor::AccessorImpl::new(input, size))?, + inner: poll_result_once(accessor::AccessorImpl::new(input, size, payload_input))?, }) } diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs index 4de8c6f..bb032cf 100644 --- a/src/decoder/aio.rs +++ b/src/decoder/aio.rs @@ -20,8 +20,12 @@ pub struct Decoder { impl Decoder> { /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. #[inline] - pub async fn from_tokio(input: T) -> io::Result { - Decoder::new(TokioReader::new(input)).await + pub async fn from_tokio(input: T, payload_input: Option) -> io::Result { + Decoder::new( + TokioReader::new(input), + payload_input.map(|payload_input| TokioReader::new(payload_input)), + ) + .await } } @@ -30,15 +34,15 @@ impl Decoder> { /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. #[inline] pub async fn open>(path: P) -> io::Result { - Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await + Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?, None).await } } impl Decoder { /// Create an async decoder from an input implementing our internal read interface. - pub async fn new(input: T) -> io::Result { + pub async fn new(input: T, payload_input: Option) -> io::Result { Ok(Self { - inner: decoder::DecoderImpl::new(input).await?, + inner: decoder::DecoderImpl::new(input, payload_input).await?, }) } diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs index d19ffd1..07b6c61 100644 --- a/src/decoder/mod.rs +++ b/src/decoder/mod.rs @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl { state: State, with_goodbye_tables: bool, + // Payload of regular files might be provided by a different reader + payload_input: Option, + payload_consumed: u64, + /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF. eof_after_entry: bool, @@ -167,6 +171,7 @@ enum State { Default, InPayload { offset: u64, + size: u64, }, /// file entries with no data (fifo, socket) @@ -195,8 +200,8 @@ pub(crate) enum ItemResult { } impl DecoderImpl { - pub async fn new(input: I) -> io::Result { - Self::new_full(input, "/".into(), false).await + pub async fn new(input: I, payload_input: Option) -> io::Result { + Self::new_full(input, "/".into(), false, payload_input).await } pub(crate) fn input(&self) -> &I { @@ -207,6 +212,7 @@ impl DecoderImpl { input: I, path: PathBuf, eof_after_entry: bool, + payload_input: Option, ) -> io::Result { let this = DecoderImpl { input, @@ -219,6 +225,8 @@ impl DecoderImpl { path_lengths: Vec::new(), state: State::Begin, with_goodbye_tables: false, + payload_input, + payload_consumed: 0, eof_after_entry, }; @@ -242,9 +250,14 @@ impl DecoderImpl { // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE: self.read_next_item().await?; } - State::InPayload { offset } => { - // We need to skip the current payload first. - self.skip_entry(offset).await?; + State::InPayload { offset, .. } => { + if self.payload_input.is_some() { + // Update consumed payload as given by the offset referenced by the content reader + self.payload_consumed += offset; + } else { + // Skip remaining payload of current entry in regular stream + self.skip_entry(offset).await?; + } self.read_next_item().await?; } State::InGoodbyeTable => { @@ -300,19 +313,23 @@ impl DecoderImpl { } pub fn content_size(&self) -> Option { - if let State::InPayload { .. } = self.state { - Some(self.current_header.content_size()) + if let State::InPayload { size, .. } = self.state { + if self.payload_input.is_some() { + Some(size) + } else { + Some(self.current_header.content_size()) + } } else { None } } pub fn content_reader(&mut self) -> Option> { - if let State::InPayload { offset } = &mut self.state { + if let State::InPayload { offset, size } = &mut self.state { Some(Contents::new( - &mut self.input, + self.payload_input.as_mut().unwrap_or(&mut self.input), offset, - self.current_header.content_size(), + *size, )) } else { None @@ -531,8 +548,63 @@ impl DecoderImpl { self.entry.kind = EntryKind::File { size: self.current_header.content_size(), offset, + payload_offset: None, + }; + self.state = State::InPayload { + offset: 0, + size: self.current_header.content_size(), + }; + return Ok(ItemResult::Entry); + } + format::PXAR_PAYLOAD_REF => { + let offset = seq_read_position(&mut self.input).await.transpose()?; + let payload_ref = self.read_payload_ref().await?; + + if let Some(payload_input) = self.payload_input.as_mut() { + if seq_read_position(payload_input) + .await + .transpose()? + .is_none() + { + if self.payload_consumed > payload_ref.offset { + io_bail!( + "unexpected offset {}, smaller than already consumed payload {}", + payload_ref.offset, + self.payload_consumed, + ); + } + let to_skip = payload_ref.offset - self.payload_consumed; + Self::skip(payload_input, to_skip as usize).await?; + self.payload_consumed += to_skip; + } + + let header: Header = seq_read_entry(payload_input).await?; + if header.htype != format::PXAR_PAYLOAD { + io_bail!( + "unexpected header in payload input: expected {} , got {header}", + format::PXAR_PAYLOAD, + ); + } + self.payload_consumed += size_of::
() as u64; + + if header.content_size() != payload_ref.size { + io_bail!( + "encountered payload size mismatch: got {}, expected {}", + payload_ref.size, + header.content_size(), + ); + } + } + + self.entry.kind = EntryKind::File { + size: payload_ref.size, + offset, + payload_offset: Some(payload_ref.offset), + }; + self.state = State::InPayload { + offset: 0, + size: payload_ref.size, }; - self.state = State::InPayload { offset: 0 }; return Ok(ItemResult::Entry); } format::PXAR_FILENAME | format::PXAR_GOODBYE => { diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs index 5597a03..caa8bcd 100644 --- a/src/decoder/sync.rs +++ b/src/decoder/sync.rs @@ -25,8 +25,11 @@ pub struct Decoder { impl Decoder> { /// Decode a `pxar` archive from a regular `std::io::Read` input. #[inline] - pub fn from_std(input: T) -> io::Result { - Decoder::new(StandardReader::new(input)) + pub fn from_std(input: T, payload_input: Option) -> io::Result { + Decoder::new( + StandardReader::new(input), + payload_input.map(|payload_input| StandardReader::new(payload_input)), + ) } /// Get a direct reference to the reader contained inside the contained [`StandardReader`]. @@ -38,7 +41,7 @@ impl Decoder> { impl Decoder> { /// Convenience shortcut for `File::open` followed by `Accessor::from_file`. pub fn open>(path: P) -> io::Result { - Self::from_std(std::fs::File::open(path.as_ref())?) + Self::from_std(std::fs::File::open(path.as_ref())?, None) } } @@ -47,9 +50,11 @@ impl Decoder { /// /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// not allowed to use the `Waker`, as this will cause a `panic!`. - pub fn new(input: T) -> io::Result { + /// The optional payload input must be used to restore regular file payloads for payload references + /// encountered within the archive. + pub fn new(input: T, payload_input: Option) -> io::Result { Ok(Self { - inner: poll_result_once(decoder::DecoderImpl::new(input))?, + inner: poll_result_once(decoder::DecoderImpl::new(input, payload_input))?, }) } diff --git a/src/lib.rs b/src/lib.rs index 210c4b1..ef81a85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -364,6 +364,9 @@ pub enum EntryKind { /// The file's byte offset inside the archive, if available. offset: Option, + + /// The file's byte offset inside the payload stream, if available. + payload_offset: Option, }, /// Directory entry. When iterating through an archive, the contents follow next. diff --git a/tests/compat.rs b/tests/compat.rs index 3b43e38..a1514ba 100644 --- a/tests/compat.rs +++ b/tests/compat.rs @@ -94,7 +94,8 @@ fn create_archive() -> io::Result> { fn test_archive() { let archive = create_archive().expect("failed to create test archive"); let mut input = &archive[..]; - let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder"); + let mut decoder = + decoder::Decoder::from_std(&mut input, None).expect("failed to create decoder"); let item = decoder .next() diff --git a/tests/simple/main.rs b/tests/simple/main.rs index e55457f..6ee93d1 100644 --- a/tests/simple/main.rs +++ b/tests/simple/main.rs @@ -61,13 +61,14 @@ fn test1() { // std::fs::write("myarchive.pxar", &file).expect("failed to write out test archive"); let mut input = &file[..]; - let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder"); + let mut decoder = + decoder::Decoder::from_std(&mut input, None).expect("failed to create decoder"); let decoded_fs = fs::Entry::decode_from(&mut decoder).expect("failed to decode previously encoded archive"); assert_eq!(test_fs, decoded_fs); - let accessor = accessor::Accessor::new(&file[..], file.len() as u64) + let accessor = accessor::Accessor::new(&file[..], file.len() as u64, None) .expect("failed to create random access reader for encoded archive"); check_bunzip2(&accessor); -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel