From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 7606495D74 for ; Wed, 28 Feb 2024 15:02:51 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B5989CEF0 for ; Wed, 28 Feb 2024 15:02:50 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:02:47 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id E937547A0F for ; Wed, 28 Feb 2024 15:02:46 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:01:54 +0100 Message-Id: <20240228140226.1251979-5-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.049 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:02:51 -0000 Implement an optional redirection to read the payload for regular files from a different input stream. This allows to decode split stream archives. Signed-off-by: Christian Ebner --- src/accessor/mod.rs | 2 ++ src/decoder/mod.rs | 82 +++++++++++++++++++++++++++++++++++++++++---- src/decoder/sync.rs | 7 ++++ src/lib.rs | 3 ++ 4 files changed, 87 insertions(+), 7 deletions(-) diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs index 6a2de73..ed99c85 100644 --- a/src/accessor/mod.rs +++ b/src/accessor/mod.rs @@ -342,6 +342,7 @@ impl AccessorImpl { EntryKind::File { offset: Some(offset), size, + .. } => { let meta_size = offset - link_offset; let entry_end = link_offset + meta_size + size; @@ -711,6 +712,7 @@ impl FileEntryImpl { EntryKind::File { size, offset: Some(offset), + .. } => Ok(Some(offset..(offset + size))), _ => Ok(None), } diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs index d1fb911..5274e2a 100644 --- a/src/decoder/mod.rs +++ b/src/decoder/mod.rs @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl { state: State, with_goodbye_tables: bool, + // Payload of regular files might be provided by a different reader + payload_input: Option, + payload_consumed: u64, + /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF. eof_after_entry: bool, @@ -167,6 +171,7 @@ enum State { Default, InPayload { offset: u64, + size: u64, }, /// file entries with no data (fifo, socket) @@ -199,6 +204,11 @@ impl DecoderImpl { Self::new_full(input, "/".into(), false).await } + pub fn redirect_payload_input(mut self, payload_input: I) -> Self { + self.payload_input = Some(payload_input); + self + } + pub(crate) fn input(&self) -> &I { &self.input } @@ -219,6 +229,8 @@ impl DecoderImpl { path_lengths: Vec::new(), state: State::Begin, with_goodbye_tables: false, + payload_input: None, + payload_consumed: 0, eof_after_entry, }; @@ -242,9 +254,14 @@ impl DecoderImpl { // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE: self.read_next_item().await?; } - State::InPayload { offset } => { - // We need to skip the current payload first. - self.skip_entry(offset).await?; + State::InPayload { offset, .. } => { + if self.payload_input.is_none() { + // Skip remaining payload of current entry in regular stream + self.skip_entry(offset).await?; + } else { + // Update consumed payload as given by the offset referenced by the content reader + self.payload_consumed += offset; + } self.read_next_item().await?; } State::InGoodbyeTable => { @@ -308,11 +325,11 @@ impl DecoderImpl { } pub fn content_reader(&mut self) -> Option> { - if let State::InPayload { offset } = &mut self.state { + if let State::InPayload { offset, size } = &mut self.state { Some(Contents::new( - &mut self.input, + self.payload_input.as_mut().unwrap_or(&mut self.input), offset, - self.current_header.content_size(), + *size, )) } else { None @@ -531,8 +548,44 @@ impl DecoderImpl { self.entry.kind = EntryKind::File { size: self.current_header.content_size(), offset, + payload_offset: None, + }; + self.state = State::InPayload { + offset: 0, + size: self.current_header.content_size(), }; - self.state = State::InPayload { offset: 0 }; + return Ok(ItemResult::Entry); + } + format::PXAR_PAYLOAD_REF => { + let offset = seq_read_position(&mut self.input).await.transpose()?; + let data_size = + usize::try_from(self.current_header.content_size()).map_err(io_err_other)?; + let bytes = seq_read_exact_data(&mut self.input, data_size).await?; + + let payload_offset = u64::from_le_bytes(bytes[0..8].try_into().unwrap()); + let size = u64::from_le_bytes(bytes[8..16].try_into().unwrap()); + + let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut() + { + seq_read_position(payload_input).await.transpose()? + } else { + None + }; + + // The if statement is actually only used to distinguish between sync and + // async decoder implementations, which might be better handled differently? + //self.payload_consumed = off; + if self.payload_input.is_some() && payload_input_offset.is_none() { + let to_skip = payload_offset - self.payload_consumed; + self.skip_payload(to_skip).await?; + } + + self.entry.kind = EntryKind::File { + size, + offset, + payload_offset: Some(payload_offset), + }; + self.state = State::InPayload { offset: 0, size }; return Ok(ItemResult::Entry); } format::PXAR_FILENAME | format::PXAR_GOODBYE => { @@ -576,6 +629,21 @@ impl DecoderImpl { Ok(()) } + async fn skip_payload(&mut self, length: u64) -> io::Result<()> { + let mut len = length; + let scratch = scratch_buffer(); + while len >= (scratch.len() as u64) { + seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?; + len -= scratch.len() as u64; + } + let len = len as usize; + if len > 0 { + seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?; + } + self.payload_consumed += length; + Ok(()) + } + async fn read_entry_as_bytes(&mut self) -> io::Result> { let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?; let data = seq_read_exact_data(&mut self.input, size).await?; diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs index 5597a03..b22b341 100644 --- a/src/decoder/sync.rs +++ b/src/decoder/sync.rs @@ -53,6 +53,13 @@ impl Decoder { }) } + /// Take the file payloads from the provided input stream rather than the regular pxar stream + pub fn redirect_payload_input(self, payload_input: T) -> Self { + Self { + inner: self.inner.redirect_payload_input(payload_input), + } + } + /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the /// layer "above" the `Accessor` propagates the actual type (sync vs async). pub(crate) fn from_impl(inner: decoder::DecoderImpl) -> Self { diff --git a/src/lib.rs b/src/lib.rs index 210c4b1..ef81a85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -364,6 +364,9 @@ pub enum EntryKind { /// The file's byte offset inside the archive, if available. offset: Option, + + /// The file's byte offset inside the payload stream, if available. + payload_offset: Option, }, /// Directory entry. When iterating through an archive, the contents follow next. -- 2.39.2