public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: "Fabian Grünbichler" <f.gruenbichler@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>
Subject: Re: [pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream
Date: Mon, 11 Mar 2024 14:21:34 +0100	[thread overview]
Message-ID: <1710153663.gtljd8y4z5.astroid@yuna.none> (raw)
In-Reply-To: <20240305092703.126906-5-c.ebner@proxmox.com>

On March 5, 2024 10:26 am, Christian Ebner wrote:
> Implement an optional redirection to read the payload for regular files
> from a different input stream.
> 
> This allows to decode split stream archives.
> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
> changes since version 1:
> - refactor to use new PayloadRef type and decoder method
> 
>  src/accessor/mod.rs |  2 ++
>  src/decoder/mod.rs  | 78 +++++++++++++++++++++++++++++++++++++++++----
>  src/decoder/sync.rs |  7 ++++
>  src/lib.rs          |  3 ++
>  4 files changed, 83 insertions(+), 7 deletions(-)
> 
> diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
> index 6a2de73..ed99c85 100644
> --- a/src/accessor/mod.rs
> +++ b/src/accessor/mod.rs
> @@ -342,6 +342,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
>              EntryKind::File {
>                  offset: Some(offset),
>                  size,
> +                ..
>              } => {
>                  let meta_size = offset - link_offset;
>                  let entry_end = link_offset + meta_size + size;
> @@ -711,6 +712,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
>              EntryKind::File {
>                  size,
>                  offset: Some(offset),
> +                ..
>              } => Ok(Some(offset..(offset + size))),
>              _ => Ok(None),
>          }
> diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
> index 70c44ce..7b8254d 100644
> --- a/src/decoder/mod.rs
> +++ b/src/decoder/mod.rs
> @@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
>      state: State,
>      with_goodbye_tables: bool,
>  
> +    // Payload of regular files might be provided by a different reader
> +    payload_input: Option<T>,
> +    payload_consumed: u64,
> +
>      /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
>      /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
>      eof_after_entry: bool,
> @@ -167,6 +171,7 @@ enum State {
>      Default,
>      InPayload {
>          offset: u64,
> +        size: u64,
>      },
>  
>      /// file entries with no data (fifo, socket)
> @@ -199,6 +204,11 @@ impl<I: SeqRead> DecoderImpl<I> {
>          Self::new_full(input, "/".into(), false).await
>      }
>  
> +    pub fn redirect_payload_input(mut self, payload_input: I) -> Self {
> +        self.payload_input = Some(payload_input);

same question as for the encoder - do we want to prevent misuse here and
check/ensure that no payload_input has already been set before?

> +        self
> +    }
> +
>      pub(crate) fn input(&self) -> &I {
>          &self.input
>      }
> @@ -219,6 +229,8 @@ impl<I: SeqRead> DecoderImpl<I> {
>              path_lengths: Vec::new(),
>              state: State::Begin,
>              with_goodbye_tables: false,
> +            payload_input: None,
> +            payload_consumed: 0,
>              eof_after_entry,
>          };
>  
> @@ -242,9 +254,14 @@ impl<I: SeqRead> DecoderImpl<I> {
>                      // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
>                      self.read_next_item().await?;
>                  }
> -                State::InPayload { offset } => {
> -                    // We need to skip the current payload first.
> -                    self.skip_entry(offset).await?;
> +                State::InPayload { offset, .. } => {
> +                    if self.payload_input.is_none() {
> +                        // Skip remaining payload of current entry in regular stream
> +                        self.skip_entry(offset).await?;
> +                    } else {
> +                        // Update consumed payload as given by the offset referenced by the content reader
> +                        self.payload_consumed += offset;
> +                    }
>                      self.read_next_item().await?;
>                  }
>                  State::InGoodbyeTable => {
> @@ -308,11 +325,11 @@ impl<I: SeqRead> DecoderImpl<I> {
>      }
>  
>      pub fn content_reader(&mut self) -> Option<Contents<I>> {
> -        if let State::InPayload { offset } = &mut self.state {
> +        if let State::InPayload { offset, size } = &mut self.state {
>              Some(Contents::new(
> -                &mut self.input,
> +                self.payload_input.as_mut().unwrap_or(&mut self.input),
>                  offset,
> -                self.current_header.content_size(),
> +                *size,
>              ))
>          } else {
>              None
> @@ -531,8 +548,40 @@ impl<I: SeqRead> DecoderImpl<I> {
>                  self.entry.kind = EntryKind::File {
>                      size: self.current_header.content_size(),
>                      offset,
> +                    payload_offset: None,
> +                };
> +                self.state = State::InPayload {
> +                    offset: 0,
> +                    size: self.current_header.content_size(),
> +                };
> +                return Ok(ItemResult::Entry);
> +            }
> +            format::PXAR_PAYLOAD_REF => {
> +                let offset = seq_read_position(&mut self.input).await.transpose()?;
> +                let payload_ref = self.read_payload_ref().await?;
> +
> +                let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut()
> +                {
> +                    seq_read_position(payload_input).await.transpose()?
> +                } else {
> +                    None
> +                };
> +
> +                // Skip payload padding for injected chunks in sync decoder
> +                if self.payload_input.is_some() && payload_input_offset.is_none() {
> +                    let to_skip = payload_ref.offset - self.payload_consumed;
> +                    self.skip_payload(to_skip).await?;
> +                }

style: these two could be combined into

if let Some(payload_input) = self.payload_input.as_mut() {
    if seq_read_position(payload_input).await.transpose()?.is_none() {
       // Skip payload padding for injected chunks in sync decoder
       let to_skip = payload_ref.offset - self.payload_consumed;
       self.skip_payload(to_skip).await?;
    }
}

> +                self.entry.kind = EntryKind::File {
> +                    size: payload_ref.size,
> +                    offset,
> +                    payload_offset: Some(payload_ref.offset),
> +                };
> +                self.state = State::InPayload {
> +                    offset: 0,
> +                    size: payload_ref.size,
>                  };
> -                self.state = State::InPayload { offset: 0 };
>                  return Ok(ItemResult::Entry);
>              }
>              format::PXAR_FILENAME | format::PXAR_GOODBYE => {
> @@ -576,6 +625,21 @@ impl<I: SeqRead> DecoderImpl<I> {
>          Ok(())
>      }
>  
> +    async fn skip_payload(&mut self, length: u64) -> io::Result<()> {
> +        let mut len = length;
> +        let scratch = scratch_buffer();
> +        while len >= (scratch.len() as u64) {
> +            seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?;
> +            len -= scratch.len() as u64;
> +        }
> +        let len = len as usize;
> +        if len > 0 {
> +            seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?;
> +        }
> +        self.payload_consumed += length;
> +        Ok(())
> +    }

nit: this could also share the "skip" part with `skip_entry`, and take the
input and length as parameter?

nit: casting to usize at the start would make the code easier to parse
IMHO

>      async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
>          let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
>          let data = seq_read_exact_data(&mut self.input, size).await?;
> diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
> index 5597a03..b22b341 100644
> --- a/src/decoder/sync.rs
> +++ b/src/decoder/sync.rs
> @@ -53,6 +53,13 @@ impl<T: SeqRead> Decoder<T> {
>          })
>      }
>  
> +    /// Take the file payloads from the provided input stream rather than the regular pxar stream
> +    pub fn redirect_payload_input(self, payload_input: T) -> Self {
> +        Self {
> +            inner: self.inner.redirect_payload_input(payload_input),

same question here about being called twice

> +        }
> +    }
> +
>      /// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
>      /// layer "above" the `Accessor` propagates the actual type (sync vs async).
>      pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
> diff --git a/src/lib.rs b/src/lib.rs
> index 210c4b1..ef81a85 100644
> --- a/src/lib.rs
> +++ b/src/lib.rs
> @@ -364,6 +364,9 @@ pub enum EntryKind {
>  
>          /// The file's byte offset inside the archive, if available.
>          offset: Option<u64>,
> +
> +        /// The file's byte offset inside the payload stream, if available.
> +        payload_offset: Option<u64>,
>      },
>  
>      /// Directory entry. When iterating through an archive, the contents follow next.
> -- 
> 2.39.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




  reply	other threads:[~2024-03-11 13:22 UTC|newest]

Thread overview: 94+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-03-05  9:26 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 01/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 13:50     ` Christian Ebner
2024-03-11 15:41       ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 03/36] format/decoder: add method to read payload references Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler [this message]
2024-03-11 14:05     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:51         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 05/36] accessor: " Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:15     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 08/36] encoder: add payload position capability Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 09/36] encoder: add payload advance capability Christian Ebner
2024-03-11 13:22   ` Fabian Grünbichler
2024-03-11 14:22     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:41         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:22     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:26     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 17/36] restore: " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:29     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:24     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-03-12  8:50   ` Fabian Grünbichler
2024-03-14  8:23     ` Christian Ebner
2024-03-12 12:47   ` Dietmar Maurer
2024-03-12 12:51     ` Christian Ebner
2024-03-12 13:03       ` Dietmar Maurer
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-03-13  9:43   ` Dietmar Maurer
2024-03-14 14:03     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-03-12  9:46   ` Fabian Grünbichler
2024-03-19 10:52     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-03-12 10:07   ` Fabian Grünbichler
2024-03-19 11:51     ` Christian Ebner
2024-03-19 12:49       ` Fabian Grünbichler
2024-03-20  8:37         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-03-12 12:12   ` Fabian Grünbichler
2024-03-12 12:25     ` Christian Ebner
2024-03-19 12:59     ` Christian Ebner
2024-03-19 13:04       ` Fabian Grünbichler
2024-03-20  8:52         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-03-12 12:17   ` Fabian Grünbichler
2024-03-12 12:31     ` Christian Ebner
2024-03-20  9:28       ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-03-12 14:08   ` Fabian Grünbichler
2024-03-20 10:28     ` Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-03-13 11:12   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-03-13 11:48   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
2024-03-13 11:18   ` Fabian Grünbichler
2024-03-13 11:44 ` [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1710153663.gtljd8y4z5.astroid@yuna.none \
    --to=f.gruenbichler@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal