From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v4 pxar 04/58] encoder: add optional output writer for file payloads
Date: Mon, 29 Apr 2024 14:10:08 +0200 [thread overview]
Message-ID: <20240429121102.315059-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240429121102.315059-1-c.ebner@proxmox.com>
During regular pxar archive encoding, the payload of regular files is
written as part of the archive.
This patch introduces functionality to attach an optional, dedicated
writer instance to redirect the payload to a different output.
The separation of data and metadata streams allows for efficient
reuse of payload data by referencing the payload writer byte offset,
without having to reencode it.
Whenever the payload of regular files is redirected to a dedicated
output writer, encode a payload reference header followed by the
required data to locate the data, instead of adding the regular payload
header followed by the encoded payload to the archive.
This is in preparation for reusing payload chunks for unchanged files
of backups created via the proxmox-backup-client.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/encoder/aio.rs | 24 +++++++++---
src/encoder/mod.rs | 89 ++++++++++++++++++++++++++++++++++++++++-----
src/encoder/sync.rs | 13 +++++--
3 files changed, 107 insertions(+), 19 deletions(-)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index ad25fea..31a1a2f 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -24,8 +24,14 @@ impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
pub async fn from_tokio(
output: T,
metadata: &Metadata,
+ payload_output: Option<T>,
) -> io::Result<Encoder<'a, TokioWriter<T>>> {
- Encoder::new(TokioWriter::new(output), metadata).await
+ Encoder::new(
+ TokioWriter::new(output),
+ metadata,
+ payload_output.map(|payload_output| TokioWriter::new(payload_output)),
+ )
+ .await
}
}
@@ -39,6 +45,7 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
Encoder::new(
TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
metadata,
+ None,
)
.await
}
@@ -46,9 +53,13 @@ impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
/// Create an asynchronous encoder for an output implementing our internal write interface.
- pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
+ pub async fn new(
+ output: T,
+ metadata: &Metadata,
+ payload_output: Option<T>,
+ ) -> io::Result<Encoder<'a, T>> {
Ok(Self {
- inner: encoder::EncoderImpl::new(output.into(), metadata).await?,
+ inner: encoder::EncoderImpl::new(output.into(), metadata, payload_output).await?,
})
}
@@ -291,9 +302,10 @@ mod test {
/// Assert that `Encoder` is `Send`
fn send_test() {
let test = async {
- let mut encoder = Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build())
- .await
- .unwrap();
+ let mut encoder =
+ Encoder::new(DummyOutput, &Metadata::dir_builder(0o700).build(), None)
+ .await
+ .unwrap();
{
let mut dir = encoder
.create_directory("baba", &Metadata::dir_builder(0o700).build())
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index da41733..99c3758 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -17,7 +17,7 @@ use endian_trait::Endian;
use crate::binary_tree_array;
use crate::decoder::{self, SeqRead};
-use crate::format::{self, GoodbyeItem};
+use crate::format::{self, GoodbyeItem, PayloadRef};
use crate::Metadata;
pub mod aio;
@@ -221,6 +221,9 @@ struct EncoderState {
/// We need to keep track how much we have written to get offsets.
write_position: u64,
+
+ /// Track the bytes written to the payload writer
+ payload_write_position: u64,
}
impl EncoderState {
@@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
/// synchronous or `async` I/O objects in as output.
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
output: EncoderOutput<'a, T>,
+ payload_output: EncoderOutput<'a, Option<T>>,
state: EncoderState,
parent: Option<&'a mut EncoderState>,
finished: bool,
@@ -306,12 +310,14 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
pub async fn new(
output: EncoderOutput<'a, T>,
metadata: &Metadata,
+ payload_output: Option<T>,
) -> io::Result<EncoderImpl<'a, T>> {
if !metadata.is_dir() {
io_bail!("directory metadata must contain the directory mode flag");
}
let mut this = Self {
output,
+ payload_output: EncoderOutput::Owned(None),
state: EncoderState::default(),
parent: None,
finished: false,
@@ -323,6 +329,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
this.encode_metadata(metadata).await?;
this.state.files_offset = this.position();
+ if let Some(payload_output) = payload_output {
+ this.payload_output = EncoderOutput::Owned(Some(payload_output));
+ }
+
Ok(this)
}
@@ -361,10 +371,37 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let file_offset = self.position();
self.start_file_do(Some(metadata), file_name).await?;
- let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
- header.check_header_size()?;
+ if let Some(payload_output) = self.payload_output.as_mut() {
+ // payload references must point to the position prior to the payload header,
+ // separating payload entries in the payload stream
+ let payload_position = self.state.payload_write_position;
+
+ let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+ header.check_header_size()?;
+ seq_write_struct(
+ payload_output,
+ header,
+ &mut self.state.payload_write_position,
+ )
+ .await?;
+
+ let payload_ref = PayloadRef {
+ offset: payload_position,
+ size: file_size,
+ };
- seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_PAYLOAD_REF,
+ &payload_ref.data(),
+ &mut self.state.write_position,
+ )
+ .await?;
+ } else {
+ let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+ header.check_header_size()?;
+ seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ }
let payload_data_offset = self.position();
@@ -372,6 +409,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(FileImpl {
output: self.output.as_mut(),
+ payload_output: self.payload_output.as_mut().as_mut(),
goodbye_item: GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
@@ -564,6 +602,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.state.write_position
}
+ #[inline]
+ fn payload_position(&mut self) -> u64 {
+ self.state.payload_write_position
+ }
+
pub async fn create_directory(
&mut self,
file_name: &Path,
@@ -588,18 +631,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
// the child will write to OUR state now:
let write_position = self.position();
+ let payload_write_position = self.payload_position();
let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
Ok(EncoderImpl {
// always forward as Borrowed(), to avoid stacking references on nested calls
output: self.output.to_borrowed_mut(),
+ payload_output: self.payload_output.to_borrowed_mut(),
state: EncoderState {
entry_offset,
files_offset,
file_offset: Some(file_offset),
file_hash,
write_position,
+ payload_write_position,
..Default::default()
},
parent: Some(&mut self.state),
@@ -764,15 +810,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
)
.await?;
+ if let EncoderOutput::Owned(Some(output)) = &mut self.payload_output {
+ flush(output).await?;
+ }
+
if let EncoderOutput::Owned(output) = &mut self.output {
flush(output).await?;
}
// done up here because of the self-borrow and to propagate
let end_offset = self.position();
+ let payload_end_offset = self.payload_position();
if let Some(parent) = &mut self.parent {
parent.write_position = end_offset;
+ parent.payload_write_position = payload_end_offset;
let file_offset = self
.state
@@ -837,6 +889,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
pub(crate) struct FileImpl<'a, S: SeqWrite> {
output: &'a mut S,
+ /// Optional write redirection of file payloads to this sequential stream
+ payload_output: Option<&'a mut S>,
+
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
/// directly instead of on Drop of FileImpl?
goodbye_item: GoodbyeItem,
@@ -916,19 +971,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
/// for convenience.
pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.check_remaining(data.len())?;
- let put =
- poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
- .await?;
- //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
+ let put = if let Some(mut output) = self.payload_output.as_mut() {
+ let put =
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
+ .await?;
+ self.parent.payload_write_position += put as u64;
+ put
+ } else {
+ let put = poll_fn(|cx| unsafe {
+ Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data)
+ })
+ .await?;
+ self.parent.write_position += put as u64;
+ put
+ };
+
self.remaining_size -= put as u64;
- self.parent.write_position += put as u64;
Ok(put)
}
/// Completely write file data for the current file entry in a pxar archive.
pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
self.check_remaining(data.len())?;
- seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+ if let Some(ref mut output) = self.payload_output {
+ seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
+ } else {
+ seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+ }
self.remaining_size -= data.len() as u64;
Ok(())
}
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 1ec91b8..96d056d 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -28,7 +28,7 @@ impl<'a, T: io::Write + 'a> Encoder<'a, StandardWriter<T>> {
/// Encode a `pxar` archive into a regular `std::io::Write` output.
#[inline]
pub fn from_std(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, StandardWriter<T>>> {
- Encoder::new(StandardWriter::new(output), metadata)
+ Encoder::new(StandardWriter::new(output), metadata, None)
}
}
@@ -41,6 +41,7 @@ impl<'a> Encoder<'a, StandardWriter<std::fs::File>> {
Encoder::new(
StandardWriter::new(std::fs::File::create(path.as_ref())?),
metadata,
+ None,
)
}
}
@@ -50,9 +51,15 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
///
/// Note that the `output`'s `SeqWrite` implementation must always return `Poll::Ready` and is
/// not allowed to use the `Waker`, as this will cause a `panic!`.
- pub fn new(output: T, metadata: &Metadata) -> io::Result<Self> {
+ // Optionally attach a dedicated writer to redirect the payloads of regular files to a separate
+ // output.
+ pub fn new(output: T, metadata: &Metadata, payload_output: Option<T>) -> io::Result<Self> {
Ok(Self {
- inner: poll_result_once(encoder::EncoderImpl::new(output.into(), metadata))?,
+ inner: poll_result_once(encoder::EncoderImpl::new(
+ output.into(),
+ metadata,
+ payload_output,
+ ))?,
})
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-04-29 12:11 UTC|newest]
Thread overview: 60+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-04-29 12:10 [pbs-devel] [PATCH v4 pxar proxmox-backup 00/58] fix #3174: improve file-level backup Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 01/58] format/examples: add header type `PXAR_PAYLOAD_REF` Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 02/58] decoder: add method to read payload references Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 03/58] decoder: factor out skip part from skip_entry Christian Ebner
2024-04-29 12:10 ` Christian Ebner [this message]
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 05/58] encoder: move to stack based state tracking Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 06/58] decoder/accessor: add optional payload input stream Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 07/58] decoder: set payload input range when decoding via accessor Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 08/58] encoder: add payload reference capability Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 09/58] encoder: add payload position capability Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 10/58] encoder: add payload advance capability Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 11/58] encoder/format: finish payload stream with marker Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 12/58] format: add payload stream start marker Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 13/58] format/encoder/decoder: new pxar entry type `Version` Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 pxar 14/58] format/encoder/decoder: new pxar entry type `Prelude` Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 15/58] client: pxar: switch to stack based encoder state Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 16/58] client: backup: factor out extension from backup target Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 17/58] client: pxar: combine writers into struct Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 18/58] client: pxar: add optional pxar payload writer instance Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 19/58] client: pxar: optionally split metadata and payload streams Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 20/58] client: helper: add helpers for creating reader instances Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 21/58] client: helper: add method for split archive name mapping Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 22/58] client: restore: read payload from dedicated index Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 23/58] tools: cover extension for split pxar archives Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 24/58] restore: " Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 25/58] client: mount: make split pxar archives mountable Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 26/58] api: datastore: refactor getting local chunk reader Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 27/58] api: datastore: attach optional payload " Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 28/58] catalog: shell: make split pxar archives accessible Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 29/58] www: cover metadata extension for pxar archives Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 30/58] file restore: factor out getting pxar reader Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 31/58] file restore: cover split metadata and payload archives Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 32/58] file restore: show more error context when extraction fails Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 33/58] pxar: add optional payload input for achive restore Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 34/58] pxar: add more context to extraction error Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 35/58] client: pxar: include payload offset in entry listing Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 36/58] pxar: show padding in debug output on archive list Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 37/58] datastore: dynamic index: add method to get digest Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 38/58] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 39/58] upload stream: implement reused chunk injector Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 40/58] client: chunk stream: add struct to hold injection state Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 41/58] client: streams: add channels for dynamic entry injection Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 42/58] specs: add backup detection mode specification Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 43/58] client: implement prepare reference method Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 44/58] client: pxar: add method for metadata comparison Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 45/58] pxar: caching: add look-ahead cache types Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 46/58] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 47/58] client: backup writer: add injected chunk count to stats Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 48/58] pxar: create: keep track of reused chunks and files Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 49/58] pxar: create: show chunk injection stats debug output Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 50/58] client: pxar: add helper to handle optional preludes Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 51/58] client: pxar: opt encode cli exclude patterns as Prelude Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 52/58] docs: file formats: describe split pxar archive file layout Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 53/58] docs: add section describing change detection mode Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 54/58] test-suite: add detection mode change benchmark Christian Ebner
2024-04-29 12:10 ` [pbs-devel] [PATCH v4 proxmox-backup 55/58] test-suite: add bin to deb, add shell completions Christian Ebner
2024-04-29 12:11 ` [pbs-devel] [PATCH v4 proxmox-backup 56/58] datastore: chunker: add Chunker trait Christian Ebner
2024-04-29 12:11 ` [pbs-devel] [PATCH v4 proxmox-backup 57/58] datastore: chunker: implement chunker for payload stream Christian Ebner
2024-04-29 12:11 ` [pbs-devel] [PATCH v4 proxmox-backup 58/58] client: chunk stream: switch payload stream chunker Christian Ebner
2024-05-07 16:00 ` [pbs-devel] [PATCH v4 pxar proxmox-backup 00/58] fix #3174: improve file-level backup Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240429121102.315059-5-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox