public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v7 pxar 07/69] decoder/accessor: allow for split input stream variant
Date: Mon, 27 May 2024 16:32:21 +0200	[thread overview]
Message-ID: <20240527143323.456002-8-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240527143323.456002-1-c.ebner@proxmox.com>

When a pxar archive was encoded using the split stream output
variant, access to the payload of regular files has to be redirected
to the corresponding dedicated input.

Allow to pass the split input variant to the decoder and accessor
instances to handle the split streams accordingly and decode split
stream archives.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 6:
- use PxarVariant instead of optional payload inputs

 examples/apxar.rs    |   2 +-
 src/accessor/aio.rs  |  10 ++--
 src/accessor/mod.rs  |  83 ++++++++++++++++++---------
 src/accessor/sync.rs |   8 +--
 src/decoder/aio.rs   |  13 +++--
 src/decoder/mod.rs   | 133 ++++++++++++++++++++++++++++++++++---------
 src/decoder/sync.rs  |  21 +++++--
 src/lib.rs           |   3 +
 tests/compat.rs      |   3 +-
 tests/simple/main.rs |   8 ++-
 10 files changed, 206 insertions(+), 78 deletions(-)

diff --git a/examples/apxar.rs b/examples/apxar.rs
index 0c62242..0dab51d 100644
--- a/examples/apxar.rs
+++ b/examples/apxar.rs
@@ -9,7 +9,7 @@ async fn main() {
         .await
         .expect("failed to open file");
 
-    let mut reader = Decoder::from_tokio(file)
+    let mut reader = Decoder::from_tokio(pxar::PxarVariant::Unified(file))
         .await
         .expect("failed to open pxar archive contents");
 
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index 98d7755..73b1025 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -18,7 +18,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::aio::Decoder;
 use crate::format::GoodbyeItem;
 use crate::util;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 use super::sync::{FileReader, FileRefReader};
 
@@ -39,7 +39,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
     /// by a blocking file.
     #[inline]
     pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
-        Accessor::new(FileReader::new(input), size).await
+        Accessor::new(PxarVariant::Unified(FileReader::new(input)), size).await
     }
 }
 
@@ -75,7 +75,7 @@ where
         input: T,
         size: u64,
     ) -> io::Result<Accessor<FileRefReader<T>>> {
-        Accessor::new(FileRefReader::new(input), size).await
+        Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size).await
     }
 }
 
@@ -85,7 +85,9 @@ impl<T: ReadAt> Accessor<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub async fn new(input: T, size: u64) -> io::Result<Self> {
+    /// Optionally take the file payloads from the provided input stream rather than the regular
+    /// pxar stream.
+    pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         Ok(Self {
             inner: accessor::AccessorImpl::new(input, size).await?,
         })
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index 6a2de73..c061b74 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -19,7 +19,7 @@ use crate::binary_tree_array;
 use crate::decoder::{self, DecoderImpl};
 use crate::format::{self, GoodbyeItem};
 use crate::util;
-use crate::{Entry, EntryKind};
+use crate::{Entry, EntryKind, PxarVariant};
 
 pub mod aio;
 pub mod cache;
@@ -179,17 +179,22 @@ struct Caches {
 
 /// The random access state machine implementation.
 pub(crate) struct AccessorImpl<T> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     size: u64,
     caches: Arc<Caches>,
 }
 
 impl<T: ReadAt> AccessorImpl<T> {
-    pub async fn new(input: T, size: u64) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         if size < (size_of::<GoodbyeItem>() as u64) {
             io_bail!("too small to contain a pxar archive");
         }
 
+        let input = input.wrap_multi(
+            |input| input,
+            |(payload_input, size)| (payload_input, 0..size),
+        );
+
         Ok(Self {
             input,
             size,
@@ -202,13 +207,14 @@ impl<T: ReadAt> AccessorImpl<T> {
     }
 
     pub async fn open_root_ref(&self) -> io::Result<DirectoryImpl<&dyn ReadAt>> {
-        DirectoryImpl::open_at_end(
-            &self.input as &dyn ReadAt,
-            self.size,
-            "/".into(),
-            Arc::clone(&self.caches),
-        )
-        .await
+        let input = match &self.input {
+            PxarVariant::Unified(input) => PxarVariant::Unified(input as &dyn ReadAt),
+            PxarVariant::Split(input, (payload_input, range)) => PxarVariant::Split(
+                input as &dyn ReadAt,
+                (payload_input as &dyn ReadAt, range.clone()),
+            ),
+        };
+        DirectoryImpl::open_at_end(input, self.size, "/".into(), Arc::clone(&self.caches)).await
     }
 
     pub fn set_goodbye_table_cache(
@@ -224,21 +230,25 @@ impl<T: ReadAt> AccessorImpl<T> {
 }
 
 async fn get_decoder<T: ReadAt>(
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_range: Range<u64>,
     path: PathBuf,
 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
-    DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
+    let input = input.wrap_multi(
+        |input| SeqReadAtAdapter::new(input, entry_range.clone()),
+        |(payload_input, range)| SeqReadAtAdapter::new(payload_input, range),
+    );
+    DecoderImpl::new_full(input, path, true).await
 }
 
 // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
 async fn get_decoder_at_filename<T: ReadAt>(
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_range: Range<u64>,
     path: PathBuf,
 ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
     // Read the header, it should be a FILENAME, then skip over it and its length:
-    let header: format::Header = read_entry_at(&input, entry_range.start).await?;
+    let header: format::Header = read_entry_at(input.archive(), entry_range.start).await?;
     header.check_header_size()?;
 
     if header.htype != format::PXAR_FILENAME {
@@ -293,6 +303,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             .next()
             .await
             .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
+
         Ok(FileEntryImpl {
             input: self.input.clone(),
             entry,
@@ -303,7 +314,11 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
 
     /// Allow opening arbitrary contents from a specific range.
     pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
-        FileContentsImpl::new(self.input.clone(), range)
+        if let Some((payload_input, _)) = &self.input.payload() {
+            FileContentsImpl::new(payload_input.clone(), range)
+        } else {
+            FileContentsImpl::new(self.input.archive().clone(), range)
+        }
     }
 
     /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@@ -342,6 +357,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             EntryKind::File {
                 offset: Some(offset),
                 size,
+                ..
             } => {
                 let meta_size = offset - link_offset;
                 let entry_end = link_offset + meta_size + size;
@@ -362,7 +378,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
 
 /// The directory random-access state machine implementation.
 pub(crate) struct DirectoryImpl<T> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry_ofs: u64,
     goodbye_ofs: u64,
     size: u64,
@@ -374,12 +390,12 @@ pub(crate) struct DirectoryImpl<T> {
 impl<T: Clone + ReadAt> DirectoryImpl<T> {
     /// Open a directory ending at the specified position.
     async fn open_at_end(
-        input: T,
+        input: PxarVariant<T, (T, Range<u64>)>,
         end_offset: u64,
         path: PathBuf,
         caches: Arc<Caches>,
     ) -> io::Result<DirectoryImpl<T>> {
-        let tail = Self::read_tail_entry(&input, end_offset).await?;
+        let tail = Self::read_tail_entry(input.archive(), end_offset).await?;
 
         if end_offset < tail.size {
             io_bail!("goodbye tail size out of range");
@@ -434,7 +450,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
                 data.as_mut_ptr() as *mut u8,
                 len * size_of::<GoodbyeItem>(),
             );
-            read_exact_at(&self.input, slice, self.table_offset()).await?;
+            read_exact_at(self.input.archive(), slice, self.table_offset()).await?;
         }
         Ok(Arc::from(data))
     }
@@ -599,7 +615,8 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
 
             let cursor = self.get_cursor(index).await?;
             if cursor.file_name == path {
-                return Ok(Some(cursor.decode_entry().await?));
+                let entry = cursor.decode_entry().await?;
+                return Ok(Some(entry));
             }
 
             dup += 1;
@@ -645,13 +662,13 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
     }
 
     async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> {
-        let head: format::Header = read_entry_at(&self.input, file_ofs).await?;
+        let head: format::Header = read_entry_at(self.input.archive(), file_ofs).await?;
         if head.htype != format::PXAR_FILENAME {
             io_bail!("expected PXAR_FILENAME header, found: {}", head);
         }
 
         let mut path = read_exact_data_at(
-            &self.input,
+            self.input.archive(),
             head.content_size() as usize,
             file_ofs + (size_of_val(&head) as u64),
         )
@@ -681,7 +698,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
 /// A file entry retrieved from a Directory.
 #[derive(Clone)]
 pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
-    input: T,
+    input: PxarVariant<T, (T, Range<u64>)>,
     entry: Entry,
     entry_range_info: EntryRangeInfo,
     caches: Arc<Caches>,
@@ -711,15 +728,29 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             EntryKind::File {
                 size,
                 offset: Some(offset),
+                payload_offset: None,
             } => Ok(Some(offset..(offset + size))),
+            // Payload offset beats regular offset if some
+            EntryKind::File {
+                size,
+                offset: Some(_offset),
+                payload_offset: Some(payload_offset),
+            } => {
+                let start_offset = payload_offset + size_of::<format::Header>() as u64;
+                Ok(Some(start_offset..start_offset + size))
+            }
             _ => Ok(None),
         }
     }
 
     pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
-        match self.content_range()? {
-            Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
-            None => io_bail!("not a file"),
+        let range = self
+            .content_range()?
+            .ok_or_else(|| io_format_err!("not a file"))?;
+        if let Some((ref payload_input, _)) = self.input.payload() {
+            Ok(FileContentsImpl::new(payload_input.clone(), range))
+        } else {
+            Ok(FileContentsImpl::new(self.input.archive().clone(), range))
         }
     }
 
diff --git a/src/accessor/sync.rs b/src/accessor/sync.rs
index a777152..df2ed23 100644
--- a/src/accessor/sync.rs
+++ b/src/accessor/sync.rs
@@ -12,7 +12,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
 use crate::decoder::Decoder;
 use crate::format::GoodbyeItem;
 use crate::util::poll_result_once;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Blocking `pxar` random-access decoder.
 ///
@@ -31,7 +31,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
     /// Decode a `pxar` archive from a standard file implementing `FileExt`.
     #[inline]
     pub fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
-        Accessor::new(FileReader::new(input), size)
+        Accessor::new(PxarVariant::Unified(FileReader::new(input)), size)
     }
 }
 
@@ -64,7 +64,7 @@ where
 {
     /// Open an `Arc` or `Rc` of `File`.
     pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result<Accessor<FileRefReader<T>>> {
-        Accessor::new(FileRefReader::new(input), size)
+        Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size)
     }
 }
 
@@ -74,7 +74,7 @@ impl<T: ReadAt> Accessor<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(input: T, size: u64) -> io::Result<Self> {
+    pub fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
         Ok(Self {
             inner: poll_result_once(accessor::AccessorImpl::new(input, size))?,
         })
diff --git a/src/decoder/aio.rs b/src/decoder/aio.rs
index 4de8c6f..3f9881d 100644
--- a/src/decoder/aio.rs
+++ b/src/decoder/aio.rs
@@ -6,7 +6,7 @@ use std::io;
 use std::path::Path;
 
 use crate::decoder::{self, Contents, SeqRead};
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Asynchronous `pxar` decoder.
 ///
@@ -20,8 +20,8 @@ pub struct Decoder<T> {
 impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> {
     /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
     #[inline]
-    pub async fn from_tokio(input: T) -> io::Result<Self> {
-        Decoder::new(TokioReader::new(input)).await
+    pub async fn from_tokio(input: PxarVariant<T, T>) -> io::Result<Self> {
+        Decoder::new(input.wrap(|input| TokioReader::new(input))).await
     }
 }
 
@@ -30,13 +30,16 @@ impl Decoder<TokioReader<tokio::fs::File>> {
     /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
     #[inline]
     pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
-        Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await
+        Decoder::from_tokio(PxarVariant::Unified(
+            tokio::fs::File::open(path.as_ref()).await?,
+        ))
+        .await
     }
 }
 
 impl<T: SeqRead> Decoder<T> {
     /// Create an async decoder from an input implementing our internal read interface.
-    pub async fn new(input: T) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
         Ok(Self {
             inner: decoder::DecoderImpl::new(input).await?,
         })
diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
index d19ffd1..b5c17b8 100644
--- a/src/decoder/mod.rs
+++ b/src/decoder/mod.rs
@@ -19,7 +19,7 @@ use endian_trait::Endian;
 
 use crate::format::{self, Header};
 use crate::util::{self, io_err_other};
-use crate::{Entry, EntryKind, Metadata};
+use crate::{Entry, EntryKind, Metadata, PxarVariant};
 
 pub mod aio;
 pub mod sync;
@@ -150,13 +150,16 @@ async fn seq_read_entry<T: SeqRead + ?Sized, E: Endian>(input: &mut T) -> io::Re
 /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
 /// synchronous or `async` I/O objects in as input.
 pub(crate) struct DecoderImpl<T> {
-    pub(crate) input: T,
+    // Payload of regular files might be provided by a different reader
+    pub(crate) input: PxarVariant<T, T>,
     current_header: Header,
     entry: Entry,
     path_lengths: Vec<usize>,
     state: State,
     with_goodbye_tables: bool,
 
+    payload_consumed: u64,
+
     /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
     /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
     eof_after_entry: bool,
@@ -167,6 +170,7 @@ enum State {
     Default,
     InPayload {
         offset: u64,
+        size: u64,
     },
 
     /// file entries with no data (fifo, socket)
@@ -195,16 +199,16 @@ pub(crate) enum ItemResult {
 }
 
 impl<I: SeqRead> DecoderImpl<I> {
-    pub async fn new(input: I) -> io::Result<Self> {
+    pub async fn new(input: PxarVariant<I, I>) -> io::Result<Self> {
         Self::new_full(input, "/".into(), false).await
     }
 
     pub(crate) fn input(&self) -> &I {
-        &self.input
+        self.input.archive()
     }
 
     pub(crate) async fn new_full(
-        input: I,
+        input: PxarVariant<I, I>,
         path: PathBuf,
         eof_after_entry: bool,
     ) -> io::Result<Self> {
@@ -219,6 +223,7 @@ impl<I: SeqRead> DecoderImpl<I> {
             path_lengths: Vec::new(),
             state: State::Begin,
             with_goodbye_tables: false,
+            payload_consumed: 0,
             eof_after_entry,
         };
 
@@ -242,9 +247,14 @@ impl<I: SeqRead> DecoderImpl<I> {
                     // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
                     self.read_next_item().await?;
                 }
-                State::InPayload { offset } => {
-                    // We need to skip the current payload first.
-                    self.skip_entry(offset).await?;
+                State::InPayload { offset, .. } => {
+                    if self.input.payload().is_some() {
+                        // Update consumed payload as given by the offset referenced by the content reader
+                        self.payload_consumed += offset;
+                    } else {
+                        // Skip remaining payload of current entry in regular stream
+                        self.skip_entry(offset).await?;
+                    }
                     self.read_next_item().await?;
                 }
                 State::InGoodbyeTable => {
@@ -300,20 +310,28 @@ impl<I: SeqRead> DecoderImpl<I> {
     }
 
     pub fn content_size(&self) -> Option<u64> {
-        if let State::InPayload { .. } = self.state {
-            Some(self.current_header.content_size())
+        if let State::InPayload { size, .. } = self.state {
+            if self.input.payload().is_some() {
+                Some(size)
+            } else {
+                Some(self.current_header.content_size())
+            }
         } else {
             None
         }
     }
 
     pub fn content_reader(&mut self) -> Option<Contents<I>> {
-        if let State::InPayload { offset } = &mut self.state {
-            Some(Contents::new(
-                &mut self.input,
-                offset,
-                self.current_header.content_size(),
-            ))
+        if let State::InPayload { offset, size } = &mut self.state {
+            if self.input.payload().is_some() {
+                Some(Contents::new(
+                    self.input.payload_mut().unwrap(),
+                    offset,
+                    *size,
+                ))
+            } else {
+                Some(Contents::new(self.input.archive_mut(), offset, *size))
+            }
         } else {
             None
         }
@@ -357,7 +375,7 @@ impl<I: SeqRead> DecoderImpl<I> {
         self.state = State::Default;
         self.entry.clear_data();
 
-        let header: Header = match seq_read_entry_or_eof(&mut self.input).await? {
+        let header: Header = match seq_read_entry_or_eof(self.input.archive_mut()).await? {
             None => return Ok(None),
             Some(header) => header,
         };
@@ -377,11 +395,11 @@ impl<I: SeqRead> DecoderImpl<I> {
         } else if header.htype == format::PXAR_ENTRY || header.htype == format::PXAR_ENTRY_V1 {
             if header.htype == format::PXAR_ENTRY {
                 self.entry.metadata = Metadata {
-                    stat: seq_read_entry(&mut self.input).await?,
+                    stat: seq_read_entry(self.input.archive_mut()).await?,
                     ..Default::default()
                 };
             } else if header.htype == format::PXAR_ENTRY_V1 {
-                let stat: format::Stat_V1 = seq_read_entry(&mut self.input).await?;
+                let stat: format::Stat_V1 = seq_read_entry(self.input.archive_mut()).await?;
 
                 self.entry.metadata = Metadata {
                     stat: stat.into(),
@@ -457,7 +475,7 @@ impl<I: SeqRead> DecoderImpl<I> {
             )
         };
 
-        match seq_read_exact_or_eof(&mut self.input, dest).await? {
+        match seq_read_exact_or_eof(self.input.archive_mut(), dest).await? {
             Some(()) => {
                 self.current_header.check_header_size()?;
                 Ok(Some(()))
@@ -527,12 +545,71 @@ impl<I: SeqRead> DecoderImpl<I> {
                 return Ok(ItemResult::Entry);
             }
             format::PXAR_PAYLOAD => {
-                let offset = seq_read_position(&mut self.input).await.transpose()?;
+                let offset = seq_read_position(self.input.archive_mut())
+                    .await
+                    .transpose()?;
                 self.entry.kind = EntryKind::File {
                     size: self.current_header.content_size(),
                     offset,
+                    payload_offset: None,
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: self.current_header.content_size(),
+                };
+                return Ok(ItemResult::Entry);
+            }
+            format::PXAR_PAYLOAD_REF => {
+                let offset = seq_read_position(self.input.archive_mut())
+                    .await
+                    .transpose()?;
+                let payload_ref = self.read_payload_ref().await?;
+
+                if let Some(payload_input) = self.input.payload_mut() {
+                    if seq_read_position(payload_input)
+                        .await
+                        .transpose()?
+                        .is_none()
+                    {
+                        if self.payload_consumed > payload_ref.offset {
+                            io_bail!(
+                                "unexpected offset {}, smaller than already consumed payload {}",
+                                payload_ref.offset,
+                                self.payload_consumed,
+                            );
+                        }
+                        let to_skip = payload_ref.offset - self.payload_consumed;
+                        Self::skip(payload_input, to_skip as usize).await?;
+                        self.payload_consumed += to_skip;
+                    }
+
+                    let header: Header = seq_read_entry(payload_input).await?;
+                    if header.htype != format::PXAR_PAYLOAD {
+                        io_bail!(
+                            "unexpected header in payload input: expected {} , got {header}",
+                            format::PXAR_PAYLOAD,
+                        );
+                    }
+                    self.payload_consumed += size_of::<Header>() as u64;
+
+                    if header.content_size() != payload_ref.size {
+                        io_bail!(
+                            "encountered payload size mismatch: got {}, expected {}",
+                            payload_ref.size,
+                            header.content_size(),
+                        );
+                    }
+                }
+
+                self.entry.kind = EntryKind::File {
+                    size: payload_ref.size,
+                    offset,
+                    payload_offset: Some(payload_ref.offset),
+                };
+                self.state = State::InPayload {
+                    offset: 0,
+                    size: payload_ref.size,
                 };
-                self.state = State::InPayload { offset: 0 };
                 return Ok(ItemResult::Entry);
             }
             format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@@ -564,7 +641,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn skip_entry(&mut self, offset: u64) -> io::Result<()> {
         let len = (self.current_header.content_size() - offset) as usize;
-        Self::skip(&mut self.input, len).await
+        Self::skip(self.input.archive_mut(), len).await
     }
 
     async fn skip(input: &mut I, mut len: usize) -> io::Result<()> {
@@ -581,7 +658,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
         let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
-        let data = seq_read_exact_data(&mut self.input, size).await?;
+        let data = seq_read_exact_data(self.input.archive_mut(), size).await?;
         Ok(data)
     }
 
@@ -598,7 +675,7 @@ impl<I: SeqRead> DecoderImpl<I> {
                 size_of::<T>(),
             );
         }
-        seq_read_entry(&mut self.input).await
+        seq_read_entry(self.input.archive_mut()).await
     }
 
     //
@@ -630,8 +707,8 @@ impl<I: SeqRead> DecoderImpl<I> {
         }
         let data_size = content_size - size_of::<u64>();
 
-        let offset: u64 = seq_read_entry(&mut self.input).await?;
-        let data = seq_read_exact_data(&mut self.input, data_size).await?;
+        let offset: u64 = seq_read_entry(self.input.archive_mut()).await?;
+        let data = seq_read_exact_data(self.input.archive_mut(), data_size).await?;
 
         Ok(format::Hardlink { offset, data })
     }
@@ -667,7 +744,7 @@ impl<I: SeqRead> DecoderImpl<I> {
 
     async fn read_payload_ref(&mut self) -> io::Result<format::PayloadRef> {
         self.current_header.check_header_size()?;
-        seq_read_entry(&mut self.input).await
+        seq_read_entry(self.input.archive_mut()).await
     }
 }
 
diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
index 5597a03..8779f87 100644
--- a/src/decoder/sync.rs
+++ b/src/decoder/sync.rs
@@ -7,7 +7,7 @@ use std::task::{Context, Poll};
 
 use crate::decoder::{self, SeqRead};
 use crate::util::poll_result_once;
-use crate::Entry;
+use crate::{Entry, PxarVariant};
 
 /// Blocking `pxar` decoder.
 ///
@@ -25,8 +25,8 @@ pub struct Decoder<T> {
 impl<T: io::Read> Decoder<StandardReader<T>> {
     /// Decode a `pxar` archive from a regular `std::io::Read` input.
     #[inline]
-    pub fn from_std(input: T) -> io::Result<Self> {
-        Decoder::new(StandardReader::new(input))
+    pub fn from_std(input: PxarVariant<T, T>) -> io::Result<Self> {
+        Decoder::new(input.wrap(|i| StandardReader::new(i)))
     }
 
     /// Get a direct reference to the reader contained inside the contained [`StandardReader`].
@@ -37,8 +37,15 @@ impl<T: io::Read> Decoder<StandardReader<T>> {
 
 impl Decoder<StandardReader<std::fs::File>> {
     /// Convenience shortcut for `File::open` followed by `Accessor::from_file`.
-    pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
-        Self::from_std(std::fs::File::open(path.as_ref())?)
+    pub fn open<P: AsRef<Path>>(path: PxarVariant<P, P>) -> io::Result<Self> {
+        let input = match path {
+            PxarVariant::Split(input, payload_input) => PxarVariant::Split(
+                std::fs::File::open(input)?,
+                std::fs::File::open(payload_input)?,
+            ),
+            PxarVariant::Unified(input) => PxarVariant::Unified(std::fs::File::open(input)?),
+        };
+        Self::from_std(input)
     }
 }
 
@@ -47,7 +54,9 @@ impl<T: SeqRead> Decoder<T> {
     ///
     /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
     /// not allowed to use the `Waker`, as this will cause a `panic!`.
-    pub fn new(input: T) -> io::Result<Self> {
+    /// The optional payload input must be used to restore regular file payloads for payload references
+    /// encountered within the archive.
+    pub fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
         Ok(Self {
             inner: poll_result_once(decoder::DecoderImpl::new(input))?,
         })
diff --git a/src/lib.rs b/src/lib.rs
index f784c9e..bafdfe4 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -364,6 +364,9 @@ pub enum EntryKind {
 
         /// The file's byte offset inside the archive, if available.
         offset: Option<u64>,
+
+        /// The file's byte offset inside the payload stream, if available.
+        payload_offset: Option<u64>,
     },
 
     /// Directory entry. When iterating through an archive, the contents follow next.
diff --git a/tests/compat.rs b/tests/compat.rs
index 3b43e38..8f1b778 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -94,7 +94,8 @@ fn create_archive() -> io::Result<Vec<u8>> {
 fn test_archive() {
     let archive = create_archive().expect("failed to create test archive");
     let mut input = &archive[..];
-    let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder");
+    let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
+        .expect("failed to create decoder");
 
     let item = decoder
         .next()
diff --git a/tests/simple/main.rs b/tests/simple/main.rs
index e55457f..e403184 100644
--- a/tests/simple/main.rs
+++ b/tests/simple/main.rs
@@ -61,14 +61,16 @@ fn test1() {
     // std::fs::write("myarchive.pxar", &file).expect("failed to write out test archive");
 
     let mut input = &file[..];
-    let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder");
+    let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
+        .expect("failed to create decoder");
     let decoded_fs =
         fs::Entry::decode_from(&mut decoder).expect("failed to decode previously encoded archive");
 
     assert_eq!(test_fs, decoded_fs);
 
-    let accessor = accessor::Accessor::new(&file[..], file.len() as u64)
-        .expect("failed to create random access reader for encoded archive");
+    let accessor =
+        accessor::Accessor::new(pxar::PxarVariant::Unified(&file[..]), file.len() as u64)
+            .expect("failed to create random access reader for encoded archive");
 
     check_bunzip2(&accessor);
     check_run_special_files(&accessor);
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-05-27 14:41 UTC|newest]

Thread overview: 71+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-05-27 14:32 [pbs-devel] [PATCH v7 pxar proxmox-backup 00/69] fix #3174: improve file-level backup Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 01/69] decoder: factor out skip part from skip_entry Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 02/69] lib: add type for input/output variant differentiation Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 03/69] encoder: move to stack based state tracking Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 04/69] format/examples: add header type `PXAR_PAYLOAD_REF` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 05/69] decoder: add method to read payload references Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 06/69] encoder: allow split output writer for archive creation Christian Ebner
2024-05-27 14:32 ` Christian Ebner [this message]
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 08/69] decoder: set payload input range when decoding via accessor Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 09/69] encoder: add payload reference capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 10/69] encoder: add payload position capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 11/69] encoder: add payload advance capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 12/69] encoder/format: finish payload stream with marker Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 13/69] format: add payload stream start marker Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 14/69] format/encoder/decoder: new pxar entry type `Version` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 15/69] format/encoder/decoder: new pxar entry type `Prelude` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 16/69] client: backup: factor out extension from backup target Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 17/69] api: datastore: refactor getting local chunk reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 18/69] client: pxar: switch to stack based encoder state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 19/69] client: pxar: combine writers into struct Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 20/69] client: pxar: optionally split metadata and payload streams Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 21/69] client: helper: add helpers for creating reader instances Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 22/69] client: helper: add method for split archive name mapping Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 23/69] client: tools: helper to check pxar filename extensions Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 24/69] client: restore: read payload from dedicated index Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 25/69] tools: cover extension for split pxar archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 26/69] restore: " Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 27/69] client: mount: make split pxar archives mountable Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 28/69] api: datastore: attach split archive payload chunk reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 29/69] catalog: shell: make split pxar archives accessible Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 30/69] www: cover metadata extension for pxar archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 31/69] file restore: factor out getting pxar reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 32/69] file restore: cover split metadata and payload archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 33/69] file restore: show more error context when extraction fails Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 34/69] pxar: add optional payload input for archive restore Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 35/69] pxar: cover listing for split archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 36/69] pxar: add more context to extraction error Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 37/69] client: pxar: include payload offset in entry listing Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 38/69] pxar: show padding in debug output on archive list Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 39/69] datastore: dynamic index: add method to get digest Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 40/69] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 41/69] upload stream: implement reused chunk injector Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 42/69] client: chunk stream: add struct to hold injection state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 43/69] chunker: add method to reset chunker state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 44/69] client: streams: add channels for dynamic entry injection Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 45/69] specs: add backup detection mode specification Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 46/69] client: implement prepare reference method Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 47/69] client: pxar: add method for metadata comparison Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 48/69] pxar: caching: add look-ahead cache Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 49/69] client: pxar: refactor catalog encoding for directories Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 50/69] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 51/69] client: backup writer: add injected chunk count to stats Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 52/69] pxar: create: keep track of reused chunks and files Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 53/69] pxar: create: show chunk injection stats debug output Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 54/69] client: pxar: add helper to handle optional preludes Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 55/69] client: pxar: opt encode cli exclude patterns as Prelude Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 56/69] pxar: ignore version and prelude entries in listing Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 57/69] docs: file formats: describe split pxar archive file layout Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 58/69] docs: add section describing change detection mode Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 59/69] test-suite: add detection mode change benchmark Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 60/69] test-suite: Makefile: add debian package and related files Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 61/69] datastore: chunker: add Chunker trait Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 62/69] datastore: chunker: implement chunker for payload stream Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 63/69] client: chunk stream: switch payload stream chunker Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 64/69] client: pxar: allow to restore prelude to optional path Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 65/69] client: pxar: add archive creation with reference test Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 66/69] client: tools: add helper to raise nofile rlimit Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 67/69] client: pxar: set cache limit based on " Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 68/69] chunker: tests: add regression tests for payload chunker Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 69/69] chunk stream: " Christian Ebner
2024-05-28  9:45 ` [pbs-devel] [PATCH v7 pxar proxmox-backup 00/69] fix #3174: improve file-level backup Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240527143323.456002-8-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal