From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream
Date: Wed, 28 Feb 2024 15:01:54 +0100 [thread overview]
Message-ID: <20240228140226.1251979-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com>
Implement an optional redirection to read the payload for regular files
from a different input stream.
This allows to decode split stream archives.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/accessor/mod.rs | 2 ++
src/decoder/mod.rs | 82 +++++++++++++++++++++++++++++++++++++++++----
src/decoder/sync.rs | 7 ++++
src/lib.rs | 3 ++
4 files changed, 87 insertions(+), 7 deletions(-)
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index 6a2de73..ed99c85 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -342,6 +342,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
EntryKind::File {
offset: Some(offset),
size,
+ ..
} => {
let meta_size = offset - link_offset;
let entry_end = link_offset + meta_size + size;
@@ -711,6 +712,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
EntryKind::File {
size,
offset: Some(offset),
+ ..
} => Ok(Some(offset..(offset + size))),
_ => Ok(None),
}
diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
index d1fb911..5274e2a 100644
--- a/src/decoder/mod.rs
+++ b/src/decoder/mod.rs
@@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
state: State,
with_goodbye_tables: bool,
+ // Payload of regular files might be provided by a different reader
+ payload_input: Option<T>,
+ payload_consumed: u64,
+
/// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
/// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
eof_after_entry: bool,
@@ -167,6 +171,7 @@ enum State {
Default,
InPayload {
offset: u64,
+ size: u64,
},
/// file entries with no data (fifo, socket)
@@ -199,6 +204,11 @@ impl<I: SeqRead> DecoderImpl<I> {
Self::new_full(input, "/".into(), false).await
}
+ pub fn redirect_payload_input(mut self, payload_input: I) -> Self {
+ self.payload_input = Some(payload_input);
+ self
+ }
+
pub(crate) fn input(&self) -> &I {
&self.input
}
@@ -219,6 +229,8 @@ impl<I: SeqRead> DecoderImpl<I> {
path_lengths: Vec::new(),
state: State::Begin,
with_goodbye_tables: false,
+ payload_input: None,
+ payload_consumed: 0,
eof_after_entry,
};
@@ -242,9 +254,14 @@ impl<I: SeqRead> DecoderImpl<I> {
// hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
self.read_next_item().await?;
}
- State::InPayload { offset } => {
- // We need to skip the current payload first.
- self.skip_entry(offset).await?;
+ State::InPayload { offset, .. } => {
+ if self.payload_input.is_none() {
+ // Skip remaining payload of current entry in regular stream
+ self.skip_entry(offset).await?;
+ } else {
+ // Update consumed payload as given by the offset referenced by the content reader
+ self.payload_consumed += offset;
+ }
self.read_next_item().await?;
}
State::InGoodbyeTable => {
@@ -308,11 +325,11 @@ impl<I: SeqRead> DecoderImpl<I> {
}
pub fn content_reader(&mut self) -> Option<Contents<I>> {
- if let State::InPayload { offset } = &mut self.state {
+ if let State::InPayload { offset, size } = &mut self.state {
Some(Contents::new(
- &mut self.input,
+ self.payload_input.as_mut().unwrap_or(&mut self.input),
offset,
- self.current_header.content_size(),
+ *size,
))
} else {
None
@@ -531,8 +548,44 @@ impl<I: SeqRead> DecoderImpl<I> {
self.entry.kind = EntryKind::File {
size: self.current_header.content_size(),
offset,
+ payload_offset: None,
+ };
+ self.state = State::InPayload {
+ offset: 0,
+ size: self.current_header.content_size(),
};
- self.state = State::InPayload { offset: 0 };
+ return Ok(ItemResult::Entry);
+ }
+ format::PXAR_PAYLOAD_REF => {
+ let offset = seq_read_position(&mut self.input).await.transpose()?;
+ let data_size =
+ usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
+ let bytes = seq_read_exact_data(&mut self.input, data_size).await?;
+
+ let payload_offset = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
+ let size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
+
+ let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut()
+ {
+ seq_read_position(payload_input).await.transpose()?
+ } else {
+ None
+ };
+
+ // The if statement is actually only used to distinguish between sync and
+ // async decoder implementations, which might be better handled differently?
+ //self.payload_consumed = off;
+ if self.payload_input.is_some() && payload_input_offset.is_none() {
+ let to_skip = payload_offset - self.payload_consumed;
+ self.skip_payload(to_skip).await?;
+ }
+
+ self.entry.kind = EntryKind::File {
+ size,
+ offset,
+ payload_offset: Some(payload_offset),
+ };
+ self.state = State::InPayload { offset: 0, size };
return Ok(ItemResult::Entry);
}
format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@@ -576,6 +629,21 @@ impl<I: SeqRead> DecoderImpl<I> {
Ok(())
}
+ async fn skip_payload(&mut self, length: u64) -> io::Result<()> {
+ let mut len = length;
+ let scratch = scratch_buffer();
+ while len >= (scratch.len() as u64) {
+ seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?;
+ len -= scratch.len() as u64;
+ }
+ let len = len as usize;
+ if len > 0 {
+ seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?;
+ }
+ self.payload_consumed += length;
+ Ok(())
+ }
+
async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
let data = seq_read_exact_data(&mut self.input, size).await?;
diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
index 5597a03..b22b341 100644
--- a/src/decoder/sync.rs
+++ b/src/decoder/sync.rs
@@ -53,6 +53,13 @@ impl<T: SeqRead> Decoder<T> {
})
}
+ /// Take the file payloads from the provided input stream rather than the regular pxar stream
+ pub fn redirect_payload_input(self, payload_input: T) -> Self {
+ Self {
+ inner: self.inner.redirect_payload_input(payload_input),
+ }
+ }
+
/// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
/// layer "above" the `Accessor` propagates the actual type (sync vs async).
pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
diff --git a/src/lib.rs b/src/lib.rs
index 210c4b1..ef81a85 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -364,6 +364,9 @@ pub enum EntryKind {
/// The file's byte offset inside the archive, if available.
offset: Option<u64>,
+
+ /// The file's byte offset inside the payload stream, if available.
+ payload_offset: Option<u64>,
},
/// Directory entry. When iterating through an archive, the contents follow next.
--
2.39.2
next prev parent reply other threads:[~2024-02-28 14:02 UTC|newest]
Thread overview: 39+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
2024-02-28 18:09 ` [pbs-devel] applied: " Thomas Lamprecht
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
2024-02-28 14:01 ` Christian Ebner [this message]
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 05/36] accessor: add optional payload input stream Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-12 10:12 ` Dietmar Maurer
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240228140226.1251979-5-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox