public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC pxar 05/36] accessor: add optional payload input stream
Date: Wed, 28 Feb 2024 15:01:55 +0100	[thread overview]
Message-ID: <20240228140226.1251979-6-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com>

Allows to read regular file payloads from a split pxar archive, where
the payload stream has been redirected to a different archive on
creation.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/accessor/aio.rs |  7 ++++
 src/accessor/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++------
 2 files changed, 82 insertions(+), 10 deletions(-)

diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index 98d7755..db6c5e4 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -91,6 +91,13 @@ impl<T: ReadAt> Accessor<T> {
         })
     }
 
+    /// Take the file payloads from the provided input stream rather than the regular pxar stream
+    pub fn redirect_payload_input(self, payload_input: T) -> Self {
+        Self {
+            inner: self.inner.redirect_payload_input(payload_input),
+        }
+    }
+
     /// Open a directory handle to the root of the pxar archive.
     pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
         Ok(Directory::new(self.inner.open_root_ref().await?))
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index ed99c85..6b3dfd2 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -182,6 +182,7 @@ pub(crate) struct AccessorImpl<T> {
     input: T,
     size: u64,
     caches: Arc<Caches>,
+    payload_input: Option<T>,
 }
 
 impl<T: ReadAt> AccessorImpl<T> {
@@ -194,9 +195,15 @@ impl<T: ReadAt> AccessorImpl<T> {
             input,
             size,
             caches: Arc::new(Caches::default()),
+            payload_input: None,
         })
     }
 
+    pub fn redirect_payload_input(mut self, payload_input: T) -> Self {
+        self.payload_input = Some(payload_input);
+        self
+    }
+
     pub fn size(&self) -> u64 {
         self.size
     }
@@ -207,6 +214,9 @@ impl<T: ReadAt> AccessorImpl<T> {
             self.size,
             "/".into(),
             Arc::clone(&self.caches),
+            self.payload_input
+                .as_ref()
+                .map(|input| input as &dyn ReadAt),
         )
         .await
     }
@@ -227,8 +237,21 @@ async fn get_decoder<T: ReadAt>(
     input: T,
     entry_range: Range<u64>,
     path: PathBuf,
+    payload_input: Option<T>,
 ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
-    DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
+    let mut decoder = DecoderImpl::new_full(
+        SeqReadAtAdapter::new(input, entry_range.clone()),
+        path,
+        true,
+    )
+    .await?;
+
+    if let Some(payload_input) = payload_input {
+        // Payload stream is just passed along, the range can therefore be zero
+        decoder = decoder.redirect_payload_input(SeqReadAtAdapter::new(payload_input, 0..0));
+    }
+
+    Ok(decoder)
 }
 
 // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
@@ -236,6 +259,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
     input: T,
     entry_range: Range<u64>,
     path: PathBuf,
+    payload_input: Option<T>,
 ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
     // Read the header, it should be a FILENAME, then skip over it and its length:
     let header: format::Header = read_entry_at(&input, entry_range.start).await?;
@@ -251,7 +275,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
     }
 
     Ok((
-        get_decoder(input, entry_offset..entry_range.end, path).await?,
+        get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?,
         entry_offset,
     ))
 }
@@ -263,6 +287,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             self.size,
             "/".into(),
             Arc::clone(&self.caches),
+            self.payload_input.clone(),
         )
         .await
     }
@@ -274,6 +299,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             offset,
             "/".into(),
             Arc::clone(&self.caches),
+            self.payload_input.clone(),
         )
         .await
     }
@@ -287,23 +313,30 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
             self.input.clone(),
             entry_range_info.entry_range.clone(),
             PathBuf::new(),
+            self.payload_input.clone(),
         )
         .await?;
         let entry = decoder
             .next()
             .await
             .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
+
         Ok(FileEntryImpl {
             input: self.input.clone(),
             entry,
             entry_range_info: entry_range_info.clone(),
             caches: Arc::clone(&self.caches),
+            payload_input: self.payload_input.clone(),
         })
     }
 
     /// Allow opening arbitrary contents from a specific range.
     pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
-        FileContentsImpl::new(self.input.clone(), range)
+        if let Some(payload_input) = &self.payload_input {
+            FileContentsImpl::new(payload_input.clone(), range)
+        } else {
+            FileContentsImpl::new(self.input.clone(), range)
+        }
     }
 
     /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@@ -326,9 +359,13 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
 
         let link_offset = entry_file_offset - link_offset;
 
-        let (mut decoder, entry_offset) =
-            get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
-                .await?;
+        let (mut decoder, entry_offset) = get_decoder_at_filename(
+            self.input.clone(),
+            link_offset..self.size,
+            PathBuf::new(),
+            self.payload_input.clone(),
+        )
+        .await?;
 
         let entry = decoder
             .next()
@@ -354,6 +391,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
                         entry_range: entry_offset..entry_end,
                     },
                     caches: Arc::clone(&self.caches),
+                    payload_input: self.payload_input.clone(),
                 })
             }
             _ => io_bail!("hardlink does not point to a regular file"),
@@ -370,6 +408,7 @@ pub(crate) struct DirectoryImpl<T> {
     table: Arc<[GoodbyeItem]>,
     path: PathBuf,
     caches: Arc<Caches>,
+    payload_input: Option<T>,
 }
 
 impl<T: Clone + ReadAt> DirectoryImpl<T> {
@@ -379,6 +418,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
         end_offset: u64,
         path: PathBuf,
         caches: Arc<Caches>,
+        payload_input: Option<T>,
     ) -> io::Result<DirectoryImpl<T>> {
         let tail = Self::read_tail_entry(&input, end_offset).await?;
 
@@ -408,6 +448,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
             table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
             path,
             caches,
+            payload_input,
         };
 
         // sanity check:
@@ -503,6 +544,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
                 None => self.path.clone(),
                 Some(file) => self.path.join(file),
             },
+            self.payload_input.clone(),
         )
         .await
     }
@@ -534,6 +576,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
                 entry_range: self.entry_range(),
             },
             caches: Arc::clone(&self.caches),
+            payload_input: self.payload_input.clone(),
         })
     }
 
@@ -686,6 +729,7 @@ pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
     entry: Entry,
     entry_range_info: EntryRangeInfo,
     caches: Arc<Caches>,
+    payload_input: Option<T>,
 }
 
 impl<T: Clone + ReadAt> FileEntryImpl<T> {
@@ -699,6 +743,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             self.entry_range_info.entry_range.end,
             self.entry.path.clone(),
             Arc::clone(&self.caches),
+            self.payload_input.clone(),
         )
         .await
     }
@@ -711,16 +756,35 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
             }
             EntryKind::File {
                 size,
-                offset: Some(offset),
-                ..
-            } => Ok(Some(offset..(offset + size))),
+                offset,
+                payload_offset,
+            } => {
+                // Payload offset will be some for PXAR_PAYLOAD_REF's
+                // It should win over the regular offset, since the actual payloads
+                // are stored in the separated payload input stream
+                if let Some(payload_offset) = payload_offset {
+                    return Ok(Some(payload_offset..(payload_offset + size)));
+                }
+
+                if let Some(offset) = offset {
+                    return Ok(Some(offset..(offset + size)));
+                }
+
+                Ok(None)
+            }
             _ => Ok(None),
         }
     }
 
     pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
         match self.content_range()? {
-            Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
+            Some(range) => {
+                if let Some(ref payload_input) = self.payload_input {
+                    Ok(FileContentsImpl::new(payload_input.clone(), range))
+                } else {
+                    Ok(FileContentsImpl::new(self.input.clone(), range))
+                }
+            }
             None => io_bail!("not a file"),
         }
     }
@@ -810,6 +874,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
             entry,
             entry_range_info: self.entry_range_info.clone(),
             caches: Arc::clone(&self.caches),
+            payload_input: self.dir.payload_input.clone(),
         })
     }
 
-- 
2.39.2





  parent reply	other threads:[~2024-02-28 14:02 UTC|newest]

Thread overview: 39+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
2024-02-28 18:09   ` [pbs-devel] applied: " Thomas Lamprecht
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-02-28 14:01 ` Christian Ebner [this message]
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-12 10:12   ` Dietmar Maurer
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240228140226.1251979-6-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal