public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads
Date: Tue,  5 Mar 2024 10:26:29 +0100	[thread overview]
Message-ID: <20240305092703.126906-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240305092703.126906-1-c.ebner@proxmox.com>

During regular pxar archive encoding, the payload of regular files is
written as part of the archive.

This patch introduces functionality to attach an optional, dedicated
writer instance to redirect the payload to a different output.
The intention for this change is to allow to separate data and metadata
streams in order to allow the reuse of payload data by referencing the
payload writer byte offset, without having to re-encode it.

Whenever the payload of regular files is redirected to a dedicated
output writer, encode a payload reference header followed by the
required data to locate the data, instead of adding the regular payload
header followed by the encoded payload to the archive.

This is in preparation for reusing payload chunks for unchanged files
of backups created via the proxmox-backup-client.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 1:
- no changes

 src/encoder/aio.rs  |  7 +++++
 src/encoder/mod.rs  | 75 +++++++++++++++++++++++++++++++++++++++------
 src/encoder/sync.rs |  7 +++++
 3 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index ad25fea..82b9ab2 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -52,6 +52,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         })
     }
 
+    /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
+    pub fn attach_payload_output(self, payload_output: T) -> Self {
+        Self {
+            inner: self.inner.attach_payload_output(payload_output),
+        }
+    }
+
     /// Create a new regular file in the archive. This returns a `File` object to which the
     /// contents have to be written out *completely*. Failing to do so will put the encoder into an
     /// error state.
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 0d342ec..e4ea69b 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -221,6 +221,9 @@ struct EncoderState {
 
     /// We need to keep track how much we have written to get offsets.
     write_position: u64,
+
+    /// Track the bytes written to the payload writer
+    payload_write_position: u64,
 }
 
 impl EncoderState {
@@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
 /// synchronous or `async` I/O objects in as output.
 pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
     output: EncoderOutput<'a, T>,
+    payload_output: EncoderOutput<'a, Option<T>>,
     state: EncoderState,
     parent: Option<&'a mut EncoderState>,
     finished: bool,
@@ -312,6 +316,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         }
         let mut this = Self {
             output,
+            payload_output: EncoderOutput::Owned(None),
             state: EncoderState::default(),
             parent: None,
             finished: false,
@@ -326,6 +331,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         Ok(this)
     }
 
+    pub fn attach_payload_output(mut self, payload_output: T) -> Self {
+        self.payload_output = EncoderOutput::Owned(Some(payload_output));
+        self
+    }
+
     fn check(&self) -> io::Result<()> {
         match self.state.encode_error {
             Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
@@ -361,10 +371,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         let file_offset = self.position();
         self.start_file_do(Some(metadata), file_name).await?;
 
-        let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
-        header.check_header_size()?;
-
-        seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+        if self.payload_output.as_mut().is_some() {
+            let mut data = self.payload_position().to_le_bytes().to_vec();
+            data.append(&mut file_size.to_le_bytes().to_vec());
+            seq_write_pxar_entry(
+                self.output.as_mut(),
+                format::PXAR_PAYLOAD_REF,
+                &data,
+                &mut self.state.write_position,
+            )
+            .await?;
+        } else {
+            let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+            header.check_header_size()?;
+            seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+        };
 
         let payload_data_offset = self.position();
 
@@ -372,6 +393,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         Ok(FileImpl {
             output: self.output.as_mut(),
+            payload_output: self.payload_output.as_mut().as_mut(),
             goodbye_item: GoodbyeItem {
                 hash: format::hash_filename(file_name),
                 offset: file_offset,
@@ -564,6 +586,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         self.state.write_position
     }
 
+    #[inline]
+    fn payload_position(&mut self) -> u64 {
+        self.state.payload_write_position
+    }
+
     pub async fn create_directory(
         &mut self,
         file_name: &Path,
@@ -588,18 +615,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 
         // the child will write to OUR state now:
         let write_position = self.position();
+        let payload_write_position = self.payload_position();
 
         let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
 
         Ok(EncoderImpl {
             // always forward as Borrowed(), to avoid stacking references on nested calls
             output: self.output.to_borrowed_mut(),
+            payload_output: self.payload_output.to_borrowed_mut(),
             state: EncoderState {
                 entry_offset,
                 files_offset,
                 file_offset: Some(file_offset),
                 file_hash,
                 write_position,
+                payload_write_position,
                 ..Default::default()
             },
             parent: Some(&mut self.state),
@@ -764,15 +794,23 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
         )
         .await?;
 
+        if let EncoderOutput::Owned(output) = &mut self.payload_output {
+            if let Some(output) = output {
+                flush(output).await?;
+            }
+        }
+
         if let EncoderOutput::Owned(output) = &mut self.output {
             flush(output).await?;
         }
 
         // done up here because of the self-borrow and to propagate
         let end_offset = self.position();
+        let payload_end_offset = self.payload_position();
 
         if let Some(parent) = &mut self.parent {
             parent.write_position = end_offset;
+            parent.payload_write_position = payload_end_offset;
 
             let file_offset = self
                 .state
@@ -837,6 +875,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
 pub(crate) struct FileImpl<'a, S: SeqWrite> {
     output: &'a mut S,
 
+    /// Optional write redirection of file payloads to this sequential stream
+    payload_output: Option<&'a mut S>,
+
     /// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
     /// directly instead of on Drop of FileImpl?
     goodbye_item: GoodbyeItem,
@@ -916,19 +957,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
     /// for convenience.
     pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
         self.check_remaining(data.len())?;
-        let put =
-            poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
-                .await?;
-        //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
+        let put = if let Some(mut output) = self.payload_output.as_mut() {
+            let put =
+                poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
+                    .await?;
+            self.parent.payload_write_position += put as u64;
+            put
+        } else {
+            let put = poll_fn(|cx| unsafe {
+                Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data)
+            })
+            .await?;
+            self.parent.write_position += put as u64;
+            put
+        };
+
         self.remaining_size -= put as u64;
-        self.parent.write_position += put as u64;
         Ok(put)
     }
 
     /// Completely write file data for the current file entry in a pxar archive.
     pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
         self.check_remaining(data.len())?;
-        seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        if let Some(ref mut output) = self.payload_output {
+            seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
+        } else {
+            seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+        }
         self.remaining_size -= data.len() as u64;
         Ok(())
     }
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 1ec91b8..28981df 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -56,6 +56,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
         })
     }
 
+    /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
+    pub fn attach_payload_output(self, payload_output: T) -> Self {
+        Self {
+            inner: self.inner.attach_payload_output(payload_output),
+        }
+    }
+
     /// Create a new regular file in the archive. This returns a `File` object to which the
     /// contents have to be written out *completely*. Failing to do so will put the encoder into an
     /// error state.
-- 
2.39.2





  parent reply	other threads:[~2024-03-05  9:27 UTC|newest]

Thread overview: 94+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-03-05  9:26 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 01/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-03-05  9:26 ` Christian Ebner [this message]
2024-03-11 13:21   ` [pbs-devel] [RFC v2 pxar 02/36] encoder: add optional output writer for file payloads Fabian Grünbichler
2024-03-11 13:50     ` Christian Ebner
2024-03-11 15:41       ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 03/36] format/decoder: add method to read payload references Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:05     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:51         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 05/36] accessor: " Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-03-11 13:21   ` Fabian Grünbichler
2024-03-11 14:15     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 08/36] encoder: add payload position capability Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 09/36] encoder: add payload advance capability Christian Ebner
2024-03-11 13:22   ` Fabian Grünbichler
2024-03-11 14:22     ` Christian Ebner
2024-03-11 15:27       ` Fabian Grünbichler
2024-03-11 15:41         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:12     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-03-11 14:57   ` Fabian Grünbichler
2024-03-11 15:22     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:26     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 17/36] restore: " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:29     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:24     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-03-11 14:58   ` Fabian Grünbichler
2024-03-11 15:31     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-03-12  8:50   ` Fabian Grünbichler
2024-03-14  8:23     ` Christian Ebner
2024-03-12 12:47   ` Dietmar Maurer
2024-03-12 12:51     ` Christian Ebner
2024-03-12 13:03       ` Dietmar Maurer
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
2024-03-13  9:43   ` Dietmar Maurer
2024-03-14 14:03     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-03-12  9:46   ` Fabian Grünbichler
2024-03-19 10:52     ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-03-12 10:07   ` Fabian Grünbichler
2024-03-19 11:51     ` Christian Ebner
2024-03-19 12:49       ` Fabian Grünbichler
2024-03-20  8:37         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-03-12 12:12   ` Fabian Grünbichler
2024-03-12 12:25     ` Christian Ebner
2024-03-19 12:59     ` Christian Ebner
2024-03-19 13:04       ` Fabian Grünbichler
2024-03-20  8:52         ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-03-12 12:17   ` Fabian Grünbichler
2024-03-12 12:31     ` Christian Ebner
2024-03-20  9:28       ` Christian Ebner
2024-03-05  9:26 ` [pbs-devel] [RFC v2 proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-03-12 14:08   ` Fabian Grünbichler
2024-03-20 10:28     ` Christian Ebner
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-03-13 11:12   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-03-13 11:48   ` Fabian Grünbichler
2024-03-05  9:27 ` [pbs-devel] [RFC v2 proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
2024-03-13 11:18   ` Fabian Grünbichler
2024-03-13 11:44 ` [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240305092703.126906-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal