* [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 18:09 ` [pbs-devel] applied: " Thomas Lamprecht
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
` (34 subsequent siblings)
35 siblings, 1 reply; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/mk-format-hashes.rs | 2 +-
src/format/mod.rs | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/examples/mk-format-hashes.rs b/examples/mk-format-hashes.rs
index 1ad606c..6e00654 100644
--- a/examples/mk-format-hashes.rs
+++ b/examples/mk-format-hashes.rs
@@ -37,7 +37,7 @@ const CONSTANTS: &[(&str, &str, &str)] = &[
"__PROXMOX_FORMAT_HARDLINK__",
),
(
- "Marks the beginnig of the payload (actual content) of regular files",
+ "Marks the beginning of the payload (actual content) of regular files",
"PXAR_PAYLOAD",
"__PROXMOX_FORMAT_PXAR_PAYLOAD__",
),
diff --git a/src/format/mod.rs b/src/format/mod.rs
index 72a193c..bfea9f6 100644
--- a/src/format/mod.rs
+++ b/src/format/mod.rs
@@ -97,7 +97,7 @@ pub const PXAR_FCAPS: u64 = 0x2da9dd9db5f7fb67;
pub const PXAR_QUOTA_PROJID: u64 = 0xe07540e82f7d1cbb;
/// Marks item as hardlink
pub const PXAR_HARDLINK: u64 = 0x51269c8422bd7275;
-/// Marks the beginnig of the payload (actual content) of regular files
+/// Marks the beginning of the payload (actual content) of regular files
pub const PXAR_PAYLOAD: u64 = 0x28147a1b0b7c1a25;
/// Marks item as entry of goodbye table
pub const PXAR_GOODBYE: u64 = 0x2fec4fa642d5731d;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
` (33 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Introduces a new PXAR_PAYLOAD_REF entry header to mark regular file
payloads which are not encoded within the regular pxar archive stream
but rather redirected to a different output stream.
The corresponding header marks the entry containing all the necessary
data for restoring the actual payload from the dedicated payload stream.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/mk-format-hashes.rs | 5 +++++
src/format/mod.rs | 2 ++
2 files changed, 7 insertions(+)
diff --git a/examples/mk-format-hashes.rs b/examples/mk-format-hashes.rs
index 6e00654..83adb38 100644
--- a/examples/mk-format-hashes.rs
+++ b/examples/mk-format-hashes.rs
@@ -41,6 +41,11 @@ const CONSTANTS: &[(&str, &str, &str)] = &[
"PXAR_PAYLOAD",
"__PROXMOX_FORMAT_PXAR_PAYLOAD__",
),
+ (
+ "Marks the beginning of a payload reference for regular files",
+ "PXAR_PAYLOAD_REF",
+ "__PROXMOX_FORMAT_PXAR_PAYLOAD_REF__",
+ ),
(
"Marks item as entry of goodbye table",
"PXAR_GOODBYE",
diff --git a/src/format/mod.rs b/src/format/mod.rs
index bfea9f6..3512691 100644
--- a/src/format/mod.rs
+++ b/src/format/mod.rs
@@ -99,6 +99,8 @@ pub const PXAR_QUOTA_PROJID: u64 = 0xe07540e82f7d1cbb;
pub const PXAR_HARDLINK: u64 = 0x51269c8422bd7275;
/// Marks the beginning of the payload (actual content) of regular files
pub const PXAR_PAYLOAD: u64 = 0x28147a1b0b7c1a25;
+/// Marks the beginning of a payload reference for regular files
+pub const PXAR_PAYLOAD_REF: u64 = 0x419d3d6bc4ba977e;
/// Marks item as entry of goodbye table
pub const PXAR_GOODBYE: u64 = 0x2fec4fa642d5731d;
/// The end marker used in the GOODBYE object
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream Christian Ebner
` (32 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
During regular pxar archive encoding, the payload of regular files is
written as part of the archive.
This patch introduces functionality to attach an optional, dedicated
writer instance to redirect the payload to a different output.
The intention for this change is to allow to separate data and metadata
streams in order to allow the reuse of payload data by referencing the
payload writer byte offset, without having to re-encode it.
Whenever the payload of regular files is redirected to a dedicated
output writer, encode a payload reference header followed by the
required data to locate the data, instead of adding the regular payload
header followed by the encoded payload to the archive.
This is in preparation for reusing payload chunks for unchanged files
of backups created via the proxmox-backup-client.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/encoder/aio.rs | 7 +++++
src/encoder/mod.rs | 75 +++++++++++++++++++++++++++++++++++++++------
src/encoder/sync.rs | 7 +++++
3 files changed, 79 insertions(+), 10 deletions(-)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index ad25fea..82b9ab2 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -52,6 +52,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
})
}
+ /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
+ pub fn attach_payload_output(self, payload_output: T) -> Self {
+ Self {
+ inner: self.inner.attach_payload_output(payload_output),
+ }
+ }
+
/// Create a new regular file in the archive. This returns a `File` object to which the
/// contents have to be written out *completely*. Failing to do so will put the encoder into an
/// error state.
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 0d342ec..e4ea69b 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -221,6 +221,9 @@ struct EncoderState {
/// We need to keep track how much we have written to get offsets.
write_position: u64,
+
+ /// Track the bytes written to the payload writer
+ payload_write_position: u64,
}
impl EncoderState {
@@ -278,6 +281,7 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
/// synchronous or `async` I/O objects in as output.
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
output: EncoderOutput<'a, T>,
+ payload_output: EncoderOutput<'a, Option<T>>,
state: EncoderState,
parent: Option<&'a mut EncoderState>,
finished: bool,
@@ -312,6 +316,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
let mut this = Self {
output,
+ payload_output: EncoderOutput::Owned(None),
state: EncoderState::default(),
parent: None,
finished: false,
@@ -326,6 +331,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(this)
}
+ pub fn attach_payload_output(mut self, payload_output: T) -> Self {
+ self.payload_output = EncoderOutput::Owned(Some(payload_output));
+ self
+ }
+
fn check(&self) -> io::Result<()> {
match self.state.encode_error {
Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
@@ -361,10 +371,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let file_offset = self.position();
self.start_file_do(Some(metadata), file_name).await?;
- let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
- header.check_header_size()?;
-
- seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ if self.payload_output.as_mut().is_some() {
+ let mut data = self.payload_position().to_le_bytes().to_vec();
+ data.append(&mut file_size.to_le_bytes().to_vec());
+ seq_write_pxar_entry(
+ self.output.as_mut(),
+ format::PXAR_PAYLOAD_REF,
+ &data,
+ &mut self.state.write_position,
+ )
+ .await?;
+ } else {
+ let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
+ header.check_header_size()?;
+ seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ };
let payload_data_offset = self.position();
@@ -372,6 +393,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(FileImpl {
output: self.output.as_mut(),
+ payload_output: self.payload_output.as_mut().as_mut(),
goodbye_item: GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
@@ -564,6 +586,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.state.write_position
}
+ #[inline]
+ fn payload_position(&mut self) -> u64 {
+ self.state.payload_write_position
+ }
+
pub async fn create_directory(
&mut self,
file_name: &Path,
@@ -588,18 +615,21 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
// the child will write to OUR state now:
let write_position = self.position();
+ let payload_write_position = self.payload_position();
let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
Ok(EncoderImpl {
// always forward as Borrowed(), to avoid stacking references on nested calls
output: self.output.to_borrowed_mut(),
+ payload_output: self.payload_output.to_borrowed_mut(),
state: EncoderState {
entry_offset,
files_offset,
file_offset: Some(file_offset),
file_hash,
write_position,
+ payload_write_position,
..Default::default()
},
parent: Some(&mut self.state),
@@ -764,15 +794,23 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
)
.await?;
+ if let EncoderOutput::Owned(output) = &mut self.payload_output {
+ if let Some(output) = output {
+ flush(output).await?;
+ }
+ }
+
if let EncoderOutput::Owned(output) = &mut self.output {
flush(output).await?;
}
// done up here because of the self-borrow and to propagate
let end_offset = self.position();
+ let payload_end_offset = self.payload_position();
if let Some(parent) = &mut self.parent {
parent.write_position = end_offset;
+ parent.payload_write_position = payload_end_offset;
let file_offset = self
.state
@@ -837,6 +875,9 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
pub(crate) struct FileImpl<'a, S: SeqWrite> {
output: &'a mut S,
+ /// Optional write redirection of file payloads to this sequential stream
+ payload_output: Option<&'a mut S>,
+
/// This file's `GoodbyeItem`. FIXME: We currently don't touch this, can we just push it
/// directly instead of on Drop of FileImpl?
goodbye_item: GoodbyeItem,
@@ -916,19 +957,33 @@ impl<'a, S: SeqWrite> FileImpl<'a, S> {
/// for convenience.
pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.check_remaining(data.len())?;
- let put =
- poll_fn(|cx| unsafe { Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data) })
- .await?;
- //let put = seq_write(self.output.as_mut().unwrap(), data).await?;
+ let put = if let Some(mut output) = self.payload_output.as_mut() {
+ let put =
+ poll_fn(|cx| unsafe { Pin::new_unchecked(&mut output).poll_seq_write(cx, data) })
+ .await?;
+ self.parent.payload_write_position += put as u64;
+ put
+ } else {
+ let put = poll_fn(|cx| unsafe {
+ Pin::new_unchecked(&mut self.output).poll_seq_write(cx, data)
+ })
+ .await?;
+ self.parent.write_position += put as u64;
+ put
+ };
+
self.remaining_size -= put as u64;
- self.parent.write_position += put as u64;
Ok(put)
}
/// Completely write file data for the current file entry in a pxar archive.
pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
self.check_remaining(data.len())?;
- seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+ if let Some(ref mut output) = self.payload_output {
+ seq_write_all(output, data, &mut self.parent.payload_write_position).await?;
+ } else {
+ seq_write_all(self.output, data, &mut self.parent.write_position).await?;
+ }
self.remaining_size -= data.len() as u64;
Ok(())
}
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 1ec91b8..28981df 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -56,6 +56,13 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
})
}
+ /// Attach a dedicated writer to redirect the payloads of regular files to a separate output
+ pub fn attach_payload_output(self, payload_output: T) -> Self {
+ Self {
+ inner: self.inner.attach_payload_output(payload_output),
+ }
+ }
+
/// Create a new regular file in the archive. This returns a `File` object to which the
/// contents have to be written out *completely*. Failing to do so will put the encoder into an
/// error state.
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (2 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 05/36] accessor: " Christian Ebner
` (31 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Implement an optional redirection to read the payload for regular files
from a different input stream.
This allows to decode split stream archives.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/accessor/mod.rs | 2 ++
src/decoder/mod.rs | 82 +++++++++++++++++++++++++++++++++++++++++----
src/decoder/sync.rs | 7 ++++
src/lib.rs | 3 ++
4 files changed, 87 insertions(+), 7 deletions(-)
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index 6a2de73..ed99c85 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -342,6 +342,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
EntryKind::File {
offset: Some(offset),
size,
+ ..
} => {
let meta_size = offset - link_offset;
let entry_end = link_offset + meta_size + size;
@@ -711,6 +712,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
EntryKind::File {
size,
offset: Some(offset),
+ ..
} => Ok(Some(offset..(offset + size))),
_ => Ok(None),
}
diff --git a/src/decoder/mod.rs b/src/decoder/mod.rs
index d1fb911..5274e2a 100644
--- a/src/decoder/mod.rs
+++ b/src/decoder/mod.rs
@@ -157,6 +157,10 @@ pub(crate) struct DecoderImpl<T> {
state: State,
with_goodbye_tables: bool,
+ // Payload of regular files might be provided by a different reader
+ payload_input: Option<T>,
+ payload_consumed: u64,
+
/// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
/// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
eof_after_entry: bool,
@@ -167,6 +171,7 @@ enum State {
Default,
InPayload {
offset: u64,
+ size: u64,
},
/// file entries with no data (fifo, socket)
@@ -199,6 +204,11 @@ impl<I: SeqRead> DecoderImpl<I> {
Self::new_full(input, "/".into(), false).await
}
+ pub fn redirect_payload_input(mut self, payload_input: I) -> Self {
+ self.payload_input = Some(payload_input);
+ self
+ }
+
pub(crate) fn input(&self) -> &I {
&self.input
}
@@ -219,6 +229,8 @@ impl<I: SeqRead> DecoderImpl<I> {
path_lengths: Vec::new(),
state: State::Begin,
with_goodbye_tables: false,
+ payload_input: None,
+ payload_consumed: 0,
eof_after_entry,
};
@@ -242,9 +254,14 @@ impl<I: SeqRead> DecoderImpl<I> {
// hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
self.read_next_item().await?;
}
- State::InPayload { offset } => {
- // We need to skip the current payload first.
- self.skip_entry(offset).await?;
+ State::InPayload { offset, .. } => {
+ if self.payload_input.is_none() {
+ // Skip remaining payload of current entry in regular stream
+ self.skip_entry(offset).await?;
+ } else {
+ // Update consumed payload as given by the offset referenced by the content reader
+ self.payload_consumed += offset;
+ }
self.read_next_item().await?;
}
State::InGoodbyeTable => {
@@ -308,11 +325,11 @@ impl<I: SeqRead> DecoderImpl<I> {
}
pub fn content_reader(&mut self) -> Option<Contents<I>> {
- if let State::InPayload { offset } = &mut self.state {
+ if let State::InPayload { offset, size } = &mut self.state {
Some(Contents::new(
- &mut self.input,
+ self.payload_input.as_mut().unwrap_or(&mut self.input),
offset,
- self.current_header.content_size(),
+ *size,
))
} else {
None
@@ -531,8 +548,44 @@ impl<I: SeqRead> DecoderImpl<I> {
self.entry.kind = EntryKind::File {
size: self.current_header.content_size(),
offset,
+ payload_offset: None,
+ };
+ self.state = State::InPayload {
+ offset: 0,
+ size: self.current_header.content_size(),
};
- self.state = State::InPayload { offset: 0 };
+ return Ok(ItemResult::Entry);
+ }
+ format::PXAR_PAYLOAD_REF => {
+ let offset = seq_read_position(&mut self.input).await.transpose()?;
+ let data_size =
+ usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
+ let bytes = seq_read_exact_data(&mut self.input, data_size).await?;
+
+ let payload_offset = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
+ let size = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
+
+ let payload_input_offset = if let Some(payload_input) = self.payload_input.as_mut()
+ {
+ seq_read_position(payload_input).await.transpose()?
+ } else {
+ None
+ };
+
+ // The if statement is actually only used to distinguish between sync and
+ // async decoder implementations, which might be better handled differently?
+ //self.payload_consumed = off;
+ if self.payload_input.is_some() && payload_input_offset.is_none() {
+ let to_skip = payload_offset - self.payload_consumed;
+ self.skip_payload(to_skip).await?;
+ }
+
+ self.entry.kind = EntryKind::File {
+ size,
+ offset,
+ payload_offset: Some(payload_offset),
+ };
+ self.state = State::InPayload { offset: 0, size };
return Ok(ItemResult::Entry);
}
format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@@ -576,6 +629,21 @@ impl<I: SeqRead> DecoderImpl<I> {
Ok(())
}
+ async fn skip_payload(&mut self, length: u64) -> io::Result<()> {
+ let mut len = length;
+ let scratch = scratch_buffer();
+ while len >= (scratch.len() as u64) {
+ seq_read_exact(self.payload_input.as_mut().unwrap(), scratch).await?;
+ len -= scratch.len() as u64;
+ }
+ let len = len as usize;
+ if len > 0 {
+ seq_read_exact(self.payload_input.as_mut().unwrap(), &mut scratch[..len]).await?;
+ }
+ self.payload_consumed += length;
+ Ok(())
+ }
+
async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
let data = seq_read_exact_data(&mut self.input, size).await?;
diff --git a/src/decoder/sync.rs b/src/decoder/sync.rs
index 5597a03..b22b341 100644
--- a/src/decoder/sync.rs
+++ b/src/decoder/sync.rs
@@ -53,6 +53,13 @@ impl<T: SeqRead> Decoder<T> {
})
}
+ /// Take the file payloads from the provided input stream rather than the regular pxar stream
+ pub fn redirect_payload_input(self, payload_input: T) -> Self {
+ Self {
+ inner: self.inner.redirect_payload_input(payload_input),
+ }
+ }
+
/// Internal helper for `Accessor`. In this case we have the low-level state machine, and the
/// layer "above" the `Accessor` propagates the actual type (sync vs async).
pub(crate) fn from_impl(inner: decoder::DecoderImpl<T>) -> Self {
diff --git a/src/lib.rs b/src/lib.rs
index 210c4b1..ef81a85 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -364,6 +364,9 @@ pub enum EntryKind {
/// The file's byte offset inside the archive, if available.
offset: Option<u64>,
+
+ /// The file's byte offset inside the payload stream, if available.
+ payload_offset: Option<u64>,
},
/// Directory entry. When iterating through an archive, the contents follow next.
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 05/36] accessor: add optional payload input stream
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (3 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
` (30 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Allows to read regular file payloads from a split pxar archive, where
the payload stream has been redirected to a different archive on
creation.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/accessor/aio.rs | 7 ++++
src/accessor/mod.rs | 85 +++++++++++++++++++++++++++++++++++++++------
2 files changed, 82 insertions(+), 10 deletions(-)
diff --git a/src/accessor/aio.rs b/src/accessor/aio.rs
index 98d7755..db6c5e4 100644
--- a/src/accessor/aio.rs
+++ b/src/accessor/aio.rs
@@ -91,6 +91,13 @@ impl<T: ReadAt> Accessor<T> {
})
}
+ /// Take the file payloads from the provided input stream rather than the regular pxar stream
+ pub fn redirect_payload_input(self, payload_input: T) -> Self {
+ Self {
+ inner: self.inner.redirect_payload_input(payload_input),
+ }
+ }
+
/// Open a directory handle to the root of the pxar archive.
pub async fn open_root_ref(&self) -> io::Result<Directory<&dyn ReadAt>> {
Ok(Directory::new(self.inner.open_root_ref().await?))
diff --git a/src/accessor/mod.rs b/src/accessor/mod.rs
index ed99c85..6b3dfd2 100644
--- a/src/accessor/mod.rs
+++ b/src/accessor/mod.rs
@@ -182,6 +182,7 @@ pub(crate) struct AccessorImpl<T> {
input: T,
size: u64,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: ReadAt> AccessorImpl<T> {
@@ -194,9 +195,15 @@ impl<T: ReadAt> AccessorImpl<T> {
input,
size,
caches: Arc::new(Caches::default()),
+ payload_input: None,
})
}
+ pub fn redirect_payload_input(mut self, payload_input: T) -> Self {
+ self.payload_input = Some(payload_input);
+ self
+ }
+
pub fn size(&self) -> u64 {
self.size
}
@@ -207,6 +214,9 @@ impl<T: ReadAt> AccessorImpl<T> {
self.size,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input
+ .as_ref()
+ .map(|input| input as &dyn ReadAt),
)
.await
}
@@ -227,8 +237,21 @@ async fn get_decoder<T: ReadAt>(
input: T,
entry_range: Range<u64>,
path: PathBuf,
+ payload_input: Option<T>,
) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
- DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await
+ let mut decoder = DecoderImpl::new_full(
+ SeqReadAtAdapter::new(input, entry_range.clone()),
+ path,
+ true,
+ )
+ .await?;
+
+ if let Some(payload_input) = payload_input {
+ // Payload stream is just passed along, the range can therefore be zero
+ decoder = decoder.redirect_payload_input(SeqReadAtAdapter::new(payload_input, 0..0));
+ }
+
+ Ok(decoder)
}
// NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
@@ -236,6 +259,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
input: T,
entry_range: Range<u64>,
path: PathBuf,
+ payload_input: Option<T>,
) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
// Read the header, it should be a FILENAME, then skip over it and its length:
let header: format::Header = read_entry_at(&input, entry_range.start).await?;
@@ -251,7 +275,7 @@ async fn get_decoder_at_filename<T: ReadAt>(
}
Ok((
- get_decoder(input, entry_offset..entry_range.end, path).await?,
+ get_decoder(input, entry_offset..entry_range.end, path, payload_input).await?,
entry_offset,
))
}
@@ -263,6 +287,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
self.size,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -274,6 +299,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
offset,
"/".into(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -287,23 +313,30 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
self.input.clone(),
entry_range_info.entry_range.clone(),
PathBuf::new(),
+ self.payload_input.clone(),
)
.await?;
let entry = decoder
.next()
.await
.ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
+
Ok(FileEntryImpl {
input: self.input.clone(),
entry,
entry_range_info: entry_range_info.clone(),
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
/// Allow opening arbitrary contents from a specific range.
pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
- FileContentsImpl::new(self.input.clone(), range)
+ if let Some(payload_input) = &self.payload_input {
+ FileContentsImpl::new(payload_input.clone(), range)
+ } else {
+ FileContentsImpl::new(self.input.clone(), range)
+ }
}
/// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@@ -326,9 +359,13 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
let link_offset = entry_file_offset - link_offset;
- let (mut decoder, entry_offset) =
- get_decoder_at_filename(self.input.clone(), link_offset..self.size, PathBuf::new())
- .await?;
+ let (mut decoder, entry_offset) = get_decoder_at_filename(
+ self.input.clone(),
+ link_offset..self.size,
+ PathBuf::new(),
+ self.payload_input.clone(),
+ )
+ .await?;
let entry = decoder
.next()
@@ -354,6 +391,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
entry_range: entry_offset..entry_end,
},
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
_ => io_bail!("hardlink does not point to a regular file"),
@@ -370,6 +408,7 @@ pub(crate) struct DirectoryImpl<T> {
table: Arc<[GoodbyeItem]>,
path: PathBuf,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: Clone + ReadAt> DirectoryImpl<T> {
@@ -379,6 +418,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
end_offset: u64,
path: PathBuf,
caches: Arc<Caches>,
+ payload_input: Option<T>,
) -> io::Result<DirectoryImpl<T>> {
let tail = Self::read_tail_entry(&input, end_offset).await?;
@@ -408,6 +448,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
table: table.as_ref().map_or_else(|| Arc::new([]), Arc::clone),
path,
caches,
+ payload_input,
};
// sanity check:
@@ -503,6 +544,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
None => self.path.clone(),
Some(file) => self.path.join(file),
},
+ self.payload_input.clone(),
)
.await
}
@@ -534,6 +576,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
entry_range: self.entry_range(),
},
caches: Arc::clone(&self.caches),
+ payload_input: self.payload_input.clone(),
})
}
@@ -686,6 +729,7 @@ pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
entry: Entry,
entry_range_info: EntryRangeInfo,
caches: Arc<Caches>,
+ payload_input: Option<T>,
}
impl<T: Clone + ReadAt> FileEntryImpl<T> {
@@ -699,6 +743,7 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
self.entry_range_info.entry_range.end,
self.entry.path.clone(),
Arc::clone(&self.caches),
+ self.payload_input.clone(),
)
.await
}
@@ -711,16 +756,35 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
}
EntryKind::File {
size,
- offset: Some(offset),
- ..
- } => Ok(Some(offset..(offset + size))),
+ offset,
+ payload_offset,
+ } => {
+ // Payload offset will be some for PXAR_PAYLOAD_REF's
+ // It should win over the regular offset, since the actual payloads
+ // are stored in the separated payload input stream
+ if let Some(payload_offset) = payload_offset {
+ return Ok(Some(payload_offset..(payload_offset + size)));
+ }
+
+ if let Some(offset) = offset {
+ return Ok(Some(offset..(offset + size)));
+ }
+
+ Ok(None)
+ }
_ => Ok(None),
}
}
pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
match self.content_range()? {
- Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)),
+ Some(range) => {
+ if let Some(ref payload_input) = self.payload_input {
+ Ok(FileContentsImpl::new(payload_input.clone(), range))
+ } else {
+ Ok(FileContentsImpl::new(self.input.clone(), range))
+ }
+ }
None => io_bail!("not a file"),
}
}
@@ -810,6 +874,7 @@ impl<'a, T: Clone + ReadAt> DirEntryImpl<'a, T> {
entry,
entry_range_info: self.entry_range_info.clone(),
caches: Arc::clone(&self.caches),
+ payload_input: self.dir.payload_input.clone(),
})
}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (4 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 05/36] accessor: " Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-03-12 10:12 ` Dietmar Maurer
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
` (29 subsequent siblings)
35 siblings, 1 reply; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
In preparation for the proxmox-backup-client look-ahead caching,
where a passing around of different encoder instances with internal
references is not feasible.
Instead of creating a new encoder instance for each directory level
and keeping references to the parent state, use an internal stack.
This is a breaking change in the pxar library API.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/pxarcmd.rs | 6 +-
src/encoder/aio.rs | 18 ++--
src/encoder/mod.rs | 246 +++++++++++++++++++++++++-------------------
src/encoder/sync.rs | 8 +-
4 files changed, 156 insertions(+), 122 deletions(-)
diff --git a/examples/pxarcmd.rs b/examples/pxarcmd.rs
index e0c779d..dcf3c44 100644
--- a/examples/pxarcmd.rs
+++ b/examples/pxarcmd.rs
@@ -138,14 +138,14 @@ fn add_directory<'a, T: SeqWrite + 'a>(
let meta = Metadata::from(&file_meta);
if file_type.is_dir() {
- let mut dir = encoder.create_directory(file_name, &meta)?;
+ encoder.create_directory(file_name, &meta)?;
add_directory(
- &mut dir,
+ encoder,
std::fs::read_dir(file_path)?,
root_path,
&mut *hardlinks,
)?;
- dir.finish()?;
+ encoder.finish()?;
} else if file_type.is_symlink() {
todo!("symlink handling");
} else if file_type.is_file() {
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 82b9ab2..7010b8e 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -105,17 +105,14 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
&mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'_, T>> {
- Ok(Encoder {
- inner: self
- .inner
- .create_directory(file_name.as_ref(), metadata)
- .await?,
- })
+ ) -> io::Result<()> {
+ self.inner
+ .create_directory(file_name.as_ref(), metadata)
+ .await
}
/// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub async fn finish(self) -> io::Result<()> {
+ pub async fn finish(&mut self) -> io::Result<()> {
self.inner.finish().await
}
@@ -302,11 +299,12 @@ mod test {
.await
.unwrap();
{
- let mut dir = encoder
+ encoder
.create_directory("baba", &Metadata::dir_builder(0o700).build())
.await
.unwrap();
- dir.create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
+ encoder
+ .create_file(&Metadata::file_builder(0o755).build(), "abab", 1024)
.await
.unwrap();
}
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index e4ea69b..962087a 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -227,6 +227,16 @@ struct EncoderState {
}
impl EncoderState {
+ #[inline]
+ fn position(&self) -> u64 {
+ self.write_position
+ }
+
+ #[inline]
+ fn payload_position(&self) -> u64 {
+ self.payload_write_position
+ }
+
fn merge_error(&mut self, error: Option<EncodeError>) {
// one error is enough:
if self.encode_error.is_none() {
@@ -244,16 +254,6 @@ pub(crate) enum EncoderOutput<'a, T> {
Borrowed(&'a mut T),
}
-impl<'a, T> EncoderOutput<'a, T> {
- #[inline]
- fn to_borrowed_mut<'s>(&'s mut self) -> EncoderOutput<'s, T>
- where
- 'a: 's,
- {
- EncoderOutput::Borrowed(self.as_mut())
- }
-}
-
impl<'a, T> std::convert::AsMut<T> for EncoderOutput<'a, T> {
fn as_mut(&mut self) -> &mut T {
match self {
@@ -282,8 +282,8 @@ impl<'a, T> std::convert::From<&'a mut T> for EncoderOutput<'a, T> {
pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
output: EncoderOutput<'a, T>,
payload_output: EncoderOutput<'a, Option<T>>,
- state: EncoderState,
- parent: Option<&'a mut EncoderState>,
+ /// EncoderState stack storing the state for each directory level
+ state: Vec<EncoderState>,
finished: bool,
/// Since only the "current" entry can be actively writing files, we share the file copy
@@ -291,21 +291,6 @@ pub(crate) struct EncoderImpl<'a, T: SeqWrite + 'a> {
file_copy_buffer: Arc<Mutex<Vec<u8>>>,
}
-impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
- fn drop(&mut self) {
- if let Some(ref mut parent) = self.parent {
- // propagate errors:
- parent.merge_error(self.state.encode_error);
- if !self.finished {
- parent.add_error(EncodeError::IncompleteDirectory);
- }
- } else if !self.finished {
- // FIXME: how do we deal with this?
- // eprintln!("Encoder dropped without finishing!");
- }
- }
-}
-
impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
pub async fn new(
output: EncoderOutput<'a, T>,
@@ -317,8 +302,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let mut this = Self {
output,
payload_output: EncoderOutput::Owned(None),
- state: EncoderState::default(),
- parent: None,
+ state: vec![EncoderState::default()],
finished: false,
file_copy_buffer: Arc::new(Mutex::new(unsafe {
crate::util::vec_new_uninitialized(1024 * 1024)
@@ -326,7 +310,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
};
this.encode_metadata(metadata).await?;
- this.state.files_offset = this.position();
+ let state = this
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ state.files_offset = state.position();
Ok(this)
}
@@ -337,13 +325,32 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
fn check(&self) -> io::Result<()> {
- match self.state.encode_error {
+ if self.finished {
+ io_bail!("unexpected encoder finished state");
+ }
+ let state = self
+ .state
+ .last()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ match state.encode_error {
Some(EncodeError::IncompleteFile) => io_bail!("incomplete file"),
Some(EncodeError::IncompleteDirectory) => io_bail!("directory not finalized"),
None => Ok(()),
}
}
+ fn state(&self) -> io::Result<&EncoderState> {
+ self.state
+ .last()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+ }
+
+ fn state_mut(&mut self) -> io::Result<&mut EncoderState> {
+ self.state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))
+ }
+
pub async fn create_file<'b>(
&'b mut self,
metadata: &Metadata,
@@ -368,26 +375,38 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
{
self.check()?;
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
self.start_file_do(Some(metadata), file_name).await?;
if self.payload_output.as_mut().is_some() {
- let mut data = self.payload_position().to_le_bytes().to_vec();
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ let mut data = state.payload_position().to_le_bytes().to_vec();
data.append(&mut file_size.to_le_bytes().to_vec());
seq_write_pxar_entry(
self.output.as_mut(),
format::PXAR_PAYLOAD_REF,
&data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
} else {
let header = format::Header::with_content_size(format::PXAR_PAYLOAD, file_size);
header.check_header_size()?;
- seq_write_struct(self.output.as_mut(), header, &mut self.state.write_position).await?;
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ seq_write_struct(self.output.as_mut(), header, &mut state.write_position).await?;
};
- let payload_data_offset = self.position();
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
+ let payload_data_offset = state.position();
let meta_size = payload_data_offset - file_offset;
@@ -400,7 +419,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
size: file_size + meta_size,
},
remaining_size: file_size,
- parent: &mut self.state,
+ parent: state,
})
}
@@ -481,7 +500,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
target: &Path,
target_offset: LinkOffset,
) -> io::Result<()> {
- let current_offset = self.position();
+ let current_offset = self.state()?.position();
if current_offset <= target_offset.0 {
io_bail!("invalid hardlink offset, can only point to prior files");
}
@@ -555,24 +574,29 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
) -> io::Result<LinkOffset> {
self.check()?;
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
let file_name = file_name.as_os_str().as_bytes();
self.start_file_do(metadata, file_name).await?;
+
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
if let Some((htype, entry_data)) = entry_htype_data {
seq_write_pxar_entry(
self.output.as_mut(),
htype,
entry_data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
- let end_offset = self.position();
+ let end_offset = state.position();
- self.state.items.push(GoodbyeItem {
+ state.items.push(GoodbyeItem {
hash: format::hash_filename(file_name),
offset: file_offset,
size: end_offset - file_offset,
@@ -581,21 +605,11 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(LinkOffset(file_offset))
}
- #[inline]
- fn position(&mut self) -> u64 {
- self.state.write_position
- }
-
- #[inline]
- fn payload_position(&mut self) -> u64 {
- self.state.payload_write_position
- }
-
pub async fn create_directory(
&mut self,
file_name: &Path,
metadata: &Metadata,
- ) -> io::Result<EncoderImpl<'_, T>> {
+ ) -> io::Result<()> {
self.check()?;
if !metadata.is_dir() {
@@ -605,37 +619,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
let file_name = file_name.as_os_str().as_bytes();
let file_hash = format::hash_filename(file_name);
- let file_offset = self.position();
+ let file_offset = self.state()?.position();
self.encode_filename(file_name).await?;
- let entry_offset = self.position();
+ let entry_offset = self.state()?.position();
self.encode_metadata(metadata).await?;
- let files_offset = self.position();
+ let state = self.state_mut()?;
+ let files_offset = state.position();
// the child will write to OUR state now:
- let write_position = self.position();
- let payload_write_position = self.payload_position();
-
- let file_copy_buffer = Arc::clone(&self.file_copy_buffer);
-
- Ok(EncoderImpl {
- // always forward as Borrowed(), to avoid stacking references on nested calls
- output: self.output.to_borrowed_mut(),
- payload_output: self.payload_output.to_borrowed_mut(),
- state: EncoderState {
- entry_offset,
- files_offset,
- file_offset: Some(file_offset),
- file_hash,
- write_position,
- payload_write_position,
- ..Default::default()
- },
- parent: Some(&mut self.state),
- finished: false,
- file_copy_buffer,
- })
+ let write_position = state.position();
+ let payload_write_position = state.payload_position();
+
+ self.state.push(EncoderState {
+ entry_offset,
+ files_offset,
+ file_offset: Some(file_offset),
+ file_hash,
+ write_position,
+ payload_write_position,
+ ..Default::default()
+ });
+
+ Ok(())
}
async fn start_file_do(
@@ -651,11 +658,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn encode_metadata(&mut self, metadata: &Metadata) -> io::Result<()> {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_struct_entry(
self.output.as_mut(),
format::PXAR_ENTRY,
metadata.stat.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
@@ -677,22 +688,30 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn write_xattr(&mut self, xattr: &format::XAttr) -> io::Result<()> {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_entry(
self.output.as_mut(),
format::PXAR_XATTR,
&xattr.data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
async fn write_acls(&mut self, acl: &crate::Acl) -> io::Result<()> {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
for acl in &acl.users {
seq_write_pxar_struct_entry(
self.output.as_mut(),
format::PXAR_ACL_USER,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -702,7 +721,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_ACL_GROUP,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -712,7 +731,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_ACL_GROUP_OBJ,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -722,7 +741,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_ACL_DEFAULT,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -732,7 +751,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_ACL_DEFAULT_USER,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -742,7 +761,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
self.output.as_mut(),
format::PXAR_ACL_DEFAULT_GROUP,
acl.clone(),
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
}
@@ -751,11 +770,15 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
}
async fn write_file_capabilities(&mut self, fcaps: &format::FCaps) -> io::Result<()> {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_entry(
self.output.as_mut(),
format::PXAR_FCAPS,
&fcaps.data,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
@@ -764,33 +787,45 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
&mut self,
quota_project_id: &format::QuotaProjectId,
) -> io::Result<()> {
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_struct_entry(
self.output.as_mut(),
format::PXAR_QUOTA_PROJID,
*quota_project_id,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
async fn encode_filename(&mut self, file_name: &[u8]) -> io::Result<()> {
crate::util::validate_filename(file_name)?;
+ let state = self
+ .state
+ .last_mut()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_entry_zero(
self.output.as_mut(),
format::PXAR_FILENAME,
file_name,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await
}
- pub async fn finish(mut self) -> io::Result<()> {
+ pub async fn finish(&mut self) -> io::Result<()> {
let tail_bytes = self.finish_goodbye_table().await?;
+ let mut state = self
+ .state
+ .pop()
+ .ok_or_else(|| io_format_err!("encoder state stack underflow"))?;
seq_write_pxar_entry(
self.output.as_mut(),
format::PXAR_GOODBYE,
&tail_bytes,
- &mut self.state.write_position,
+ &mut state.write_position,
)
.await?;
@@ -804,34 +839,37 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
flush(output).await?;
}
- // done up here because of the self-borrow and to propagate
- let end_offset = self.position();
- let payload_end_offset = self.payload_position();
+ let end_offset = state.position();
+ let payload_end_offset = state.payload_position();
- if let Some(parent) = &mut self.parent {
+ if let Some(parent) = self.state.last_mut() {
parent.write_position = end_offset;
parent.payload_write_position = payload_end_offset;
- let file_offset = self
- .state
+ let file_offset = state
.file_offset
.expect("internal error: parent set but no file_offset?");
parent.items.push(GoodbyeItem {
- hash: self.state.file_hash,
+ hash: state.file_hash,
offset: file_offset,
size: end_offset - file_offset,
});
+ // propagate errors
+ parent.merge_error(state.encode_error);
+ } else {
+ self.finished = true;
}
- self.finished = true;
+
Ok(())
}
async fn finish_goodbye_table(&mut self) -> io::Result<Vec<u8>> {
- let goodbye_offset = self.position();
+ let state = self.state_mut()?;
+ let goodbye_offset = state.position();
// "take" out the tail (to not leave an array of endian-swapped structs in `self`)
- let mut tail = take(&mut self.state.items);
+ let mut tail = take(&mut state.items);
let tail_size = (tail.len() + 1) * size_of::<GoodbyeItem>();
let goodbye_size = tail_size as u64 + size_of::<format::Header>() as u64;
@@ -856,7 +894,7 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
bst.push(
GoodbyeItem {
hash: format::PXAR_GOODBYE_TAIL_MARKER,
- offset: goodbye_offset - self.state.entry_offset,
+ offset: goodbye_offset - state.entry_offset,
size: goodbye_size,
}
.to_le(),
@@ -886,8 +924,8 @@ pub(crate) struct FileImpl<'a, S: SeqWrite> {
/// exactly zero.
remaining_size: u64,
- /// The directory containing this file. This is where we propagate the `IncompleteFile` error
- /// to, and where we insert our `GoodbyeItem`.
+ /// The directory stack with the last item being the directory containing this file. This is
+ /// where we propagate the `IncompleteFile` error to, and where we insert our `GoodbyeItem`.
parent: &'a mut EncoderState,
}
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 28981df..de41e25 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -106,14 +106,12 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
&mut self,
file_name: P,
metadata: &Metadata,
- ) -> io::Result<Encoder<'_, T>> {
- Ok(Encoder {
- inner: poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))?,
- })
+ ) -> io::Result<()> {
+ poll_result_once(self.inner.create_directory(file_name.as_ref(), metadata))
}
/// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
- pub fn finish(self) -> io::Result<()> {
+ pub fn finish(&mut self) -> io::Result<()> {
poll_result_once(self.inner.finish())
}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* Re: [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
@ 2024-03-12 10:12 ` Dietmar Maurer
0 siblings, 0 replies; 39+ messages in thread
From: Dietmar Maurer @ 2024-03-12 10:12 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Christian Ebner
Seems the check for self.finished is now missing?
Sure, we have not done anything, but we should.
> -impl<'a, T: SeqWrite + 'a> Drop for EncoderImpl<'a, T> {
> - fn drop(&mut self) {
> - if let Some(ref mut parent) = self.parent {
> - // propagate errors:
> - parent.merge_error(self.state.encode_error);
> - if !self.finished {
> - parent.add_error(EncodeError::IncompleteDirectory);
> - }
> - } else if !self.finished {
> - // FIXME: how do we deal with this?
> - // eprintln!("Encoder dropped without finishing!");
> - }
> - }
> -}
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (5 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
` (28 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Allows to encode regular files with a payload reference within a
separate payload archive rather than encoding the payload within the
regular archive.
Following the PXAR_PAYLOAD_REF marked header, the payload offset and
size are encoded.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/encoder/aio.rs | 15 +++++++++++++-
src/encoder/mod.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++
src/encoder/sync.rs | 18 ++++++++++++++++-
3 files changed, 80 insertions(+), 2 deletions(-)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 7010b8e..07ad275 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -5,7 +5,7 @@ use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
-use crate::encoder::{self, LinkOffset, SeqWrite};
+use crate::encoder::{self, LinkOffset, PayloadOffset, SeqWrite};
use crate::format;
use crate::Metadata;
@@ -98,6 +98,19 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
// content.as_async_reader(),
// ).await
// }
+ //
+ /// Encode a payload reference, returning the payload offset within the payload stream
+ pub async fn add_payload_ref(
+ &mut self,
+ metadata: &Metadata,
+ file_name: &Path,
+ file_size: u64,
+ payload_offset: PayloadOffset,
+ ) -> io::Result<()> {
+ self.inner
+ .add_payload_ref(metadata, file_name.as_ref(), file_size, payload_offset)
+ .await
+ }
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 962087a..5b196df 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -38,6 +38,30 @@ impl LinkOffset {
}
}
+/// File reference used to create payload references.
+#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd)]
+pub struct PayloadOffset(u64);
+
+impl PayloadOffset {
+ /// Get the raw byte offset of this link.
+ #[inline]
+ pub fn raw(self) -> u64 {
+ self.0
+ }
+
+ /// Return a new PayloadOffset, positively shifted by offset
+ #[inline]
+ pub fn add(&self, offset: u64) -> Self {
+ Self(self.0 + offset)
+ }
+
+ /// Return a new PayloadOffset, negatively shifted by offset
+ #[inline]
+ pub fn sub(&self, offset: u64) -> Self {
+ Self(self.0 - offset)
+ }
+}
+
/// Sequential write interface used by the encoder's state machine.
///
/// This is our internal writer trait which is available for `std::io::Write` types in the
@@ -472,6 +496,31 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(offset)
}
+ /// Encode a payload reference, returning the payload offset within the payload stream
+ pub async fn add_payload_ref(
+ &mut self,
+ metadata: &Metadata,
+ file_name: &Path,
+ file_size: u64,
+ payload_offset: PayloadOffset,
+ ) -> io::Result<()> {
+ if self.payload_output.as_mut().is_none() {
+ return Err(io_format_err!("unable to add payload reference"));
+ }
+
+ let mut payload_ref = payload_offset.raw().to_le_bytes().to_vec();
+ payload_ref.append(&mut file_size.to_le_bytes().to_vec());
+ let _this_offset: LinkOffset = self
+ .add_file_entry(
+ Some(metadata),
+ file_name,
+ Some((format::PXAR_PAYLOAD_REF, &payload_ref)),
+ )
+ .await?;
+
+ Ok(())
+ }
+
/// Return a file offset usable with `add_hardlink`.
pub async fn add_symlink(
&mut self,
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index de41e25..ac78ae3 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -6,7 +6,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use crate::decoder::sync::StandardReader;
-use crate::encoder::{self, LinkOffset, SeqWrite};
+use crate::encoder::{self, LinkOffset, PayloadOffset, SeqWrite};
use crate::format;
use crate::util::poll_result_once;
use crate::Metadata;
@@ -100,6 +100,22 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
))
}
+ /// Encode a payload reference, returning the payload offset within the payload stream
+ pub async fn add_payload_ref(
+ &mut self,
+ metadata: &Metadata,
+ file_name: &Path,
+ file_size: u64,
+ payload_offset: PayloadOffset,
+ ) -> io::Result<()> {
+ poll_result_once(self.inner.add_payload_ref(
+ metadata,
+ file_name.as_ref(),
+ file_size,
+ payload_offset,
+ ))
+ }
+
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
pub fn create_directory<P: AsRef<Path>>(
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (6 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
` (27 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Allows to read the current payload offset from the dedicated payload
input stream. This is required to get the current offset for calculation
of forced boundaries in the proxmox-backup-client, when injecting reused
payload chunks into the payload stream.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/encoder/aio.rs | 5 +++++
src/encoder/mod.rs | 4 ++++
src/encoder/sync.rs | 5 +++++
3 files changed, 14 insertions(+)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 07ad275..12121fc 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -79,6 +79,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
})
}
+ /// Get current position for payload stream
+ pub fn payload_position(&self) -> io::Result<PayloadOffset> {
+ self.inner.payload_position()
+ }
+
// /// Convenience shortcut to add a *regular* file by path including its contents to the archive.
// pub async fn add_file<P, F>(
// &mut self,
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 5b196df..921a0d9 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -496,6 +496,10 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(offset)
}
+ pub fn payload_position(&self) -> io::Result<PayloadOffset> {
+ Ok(PayloadOffset(self.state()?.payload_position()))
+ }
+
/// Encode a payload reference, returning the payload offset within the payload stream
pub async fn add_payload_ref(
&mut self,
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index ac78ae3..818279f 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -100,6 +100,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
))
}
+ /// Get current payload position for payload stream
+ pub fn payload_position(&self) -> io::Result<PayloadOffset> {
+ self.inner.payload_position()
+ }
+
/// Encode a payload reference, returning the payload offset within the payload stream
pub async fn add_payload_ref(
&mut self,
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (7 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
@ 2024-02-28 14:01 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
` (26 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:01 UTC (permalink / raw)
To: pbs-devel
Allows to advance the payload writer position by a given size.
This is used to update the encoders payload input position when
injecting reused chunks for files with unchanged metadata.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/encoder/aio.rs | 5 +++++
src/encoder/mod.rs | 6 ++++++
src/encoder/sync.rs | 5 +++++
3 files changed, 16 insertions(+)
diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs
index 12121fc..a44e040 100644
--- a/src/encoder/aio.rs
+++ b/src/encoder/aio.rs
@@ -117,6 +117,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
.await
}
+ /// Add size to payload stream
+ pub fn advance(&mut self, size: u64) -> io::Result<()> {
+ self.inner.advance(size)
+ }
+
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
pub async fn create_directory<P: AsRef<Path>>(
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index 921a0d9..dbb6fcb 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -525,6 +525,12 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
Ok(())
}
+ /// Add size to payload stream
+ pub fn advance(&mut self, size: u64) -> io::Result<()> {
+ self.state_mut()?.payload_write_position += size;
+ Ok(())
+ }
+
/// Return a file offset usable with `add_hardlink`.
pub async fn add_symlink(
&mut self,
diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs
index 818279f..00fced5 100644
--- a/src/encoder/sync.rs
+++ b/src/encoder/sync.rs
@@ -121,6 +121,11 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
))
}
+ /// Add size to payload stream
+ pub fn advance(&mut self, size: u64) -> io::Result<()> {
+ self.inner.advance(size)
+ }
+
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
pub fn create_directory<P: AsRef<Path>>(
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (8 preceding siblings ...)
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
` (25 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Mark the end of the optional payload stream, this makes sure that at
least some bytes are written to the stream (as empty archives are not
allowed by the proxmox backup server) and possible injected chunks
must be consumed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/mk-format-hashes.rs | 5 +++++
src/encoder/mod.rs | 7 +++++++
src/format/mod.rs | 2 ++
3 files changed, 14 insertions(+)
diff --git a/examples/mk-format-hashes.rs b/examples/mk-format-hashes.rs
index 83adb38..de73df0 100644
--- a/examples/mk-format-hashes.rs
+++ b/examples/mk-format-hashes.rs
@@ -56,6 +56,11 @@ const CONSTANTS: &[(&str, &str, &str)] = &[
"PXAR_GOODBYE_TAIL_MARKER",
"__PROXMOX_FORMAT_PXAR_GOODBYE_TAIL_MARKER__",
),
+ (
+ "The end marker used in the separate payload stream",
+ "PXAR_PAYLOAD_TAIL_MARKER",
+ "__PROXMOX_FORMAT_PXAR_PAYLOAD_TAIL_MARKER__",
+ ),
];
fn main() {
diff --git a/src/encoder/mod.rs b/src/encoder/mod.rs
index dbb6fcb..9854114 100644
--- a/src/encoder/mod.rs
+++ b/src/encoder/mod.rs
@@ -890,6 +890,13 @@ impl<'a, T: SeqWrite + 'a> EncoderImpl<'a, T> {
if let EncoderOutput::Owned(output) = &mut self.payload_output {
if let Some(output) = output {
+ seq_write_pxar_entry(
+ output,
+ format::PXAR_PAYLOAD_TAIL_MARKER,
+ &[],
+ &mut state.payload_write_position,
+ )
+ .await?;
flush(output).await?;
}
}
diff --git a/src/format/mod.rs b/src/format/mod.rs
index 3512691..d6fad49 100644
--- a/src/format/mod.rs
+++ b/src/format/mod.rs
@@ -105,6 +105,8 @@ pub const PXAR_PAYLOAD_REF: u64 = 0x419d3d6bc4ba977e;
pub const PXAR_GOODBYE: u64 = 0x2fec4fa642d5731d;
/// The end marker used in the GOODBYE object
pub const PXAR_GOODBYE_TAIL_MARKER: u64 = 0xef5eed5b753e1555;
+/// The end marker used in the separate payload stream
+pub const PXAR_PAYLOAD_TAIL_MARKER: u64 = 0x6c72b78b984c81b5;
#[derive(Debug, Endian)]
#[repr(C)]
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (9 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
` (24 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
In preparation for look-ahead caching where a passing around of
different encoder instances with internal references would not be
feasible to use.
Previously, for each directory level a new encoder instance has been
generated, reducing possible implementation errors. These encoder
instances have been internally linked by references to keep track of
the state changes in a parent child relationship.
This is however not feasible when the encoder has to be passed by
mutable reference, as is the case for the look-ahead cache
implementation. The encoder has therefore been adapted to use a'
single object implementation with an internal stack keeping track of
the state.
Depends on the pxar library version.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 60efb0ce..de8c0696 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -663,7 +663,7 @@ impl Archiver {
) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
- let mut encoder = encoder.create_directory(dir_name, metadata).await?;
+ encoder.create_directory(dir_name, metadata).await?;
let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags;
@@ -686,7 +686,7 @@ impl Archiver {
log::info!("skipping mount point: {:?}", self.path);
Ok(())
} else {
- self.archive_dir_contents(&mut encoder, dir, false).await
+ self.archive_dir_contents(encoder, dir, false).await
};
self.fs_magic = old_fs_magic;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (10 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
` (23 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Instead of composing the backup target name and pushing it to the
backup list, push the archive name and extension separately, only
constructing it while iterating the list later.
By this it remains possible to additionally prefix the extension, as
required with the separate pxar metadata and payload indexes.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/main.rs | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 546275cb..e3445b7b 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -785,7 +785,8 @@ async fn create_backup(
upload_list.push((
BackupSpecificationType::PXAR,
filename.to_owned(),
- format!("{}.didx", target),
+ target.to_owned(),
+ "didx",
0,
));
}
@@ -803,7 +804,8 @@ async fn create_backup(
upload_list.push((
BackupSpecificationType::IMAGE,
filename.to_owned(),
- format!("{}.fidx", target),
+ target.to_owned(),
+ "fidx",
size,
));
}
@@ -814,7 +816,8 @@ async fn create_backup(
upload_list.push((
BackupSpecificationType::CONFIG,
filename.to_owned(),
- format!("{}.blob", target),
+ target.to_owned(),
+ "blob",
metadata.len(),
));
}
@@ -825,7 +828,8 @@ async fn create_backup(
upload_list.push((
BackupSpecificationType::LOGFILE,
filename.to_owned(),
- format!("{}.blob", target),
+ target.to_owned(),
+ "blob",
metadata.len(),
));
}
@@ -944,7 +948,8 @@ async fn create_backup(
log::info!("{} {} '{}' to '{}' as {}", what, desc, file, repo, target);
};
- for (backup_type, filename, target, size) in upload_list {
+ for (backup_type, filename, target_base, extension, size) in upload_list {
+ let target = format!("{target_base}.{extension}");
match (backup_type, dry_run) {
// dry-run
(BackupSpecificationType::CONFIG, true) => log_file("config file", &filename, &target),
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (11 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
` (22 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Early return when the check fails, avoiding constuction of unused
object instances.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/main.rs | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index e3445b7b..256080be 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -194,6 +194,9 @@ async fn backup_directory<P: AsRef<Path>>(
) -> Result<BackupStats, Error> {
let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+ if upload_options.fixed_size.is_some() {
+ bail!("cannot backup directory with fixed chunk size!");
+ }
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
@@ -206,9 +209,6 @@ async fn backup_directory<P: AsRef<Path>>(
}
});
- if upload_options.fixed_size.is_some() {
- bail!("cannot backup directory with fixed chunk size!");
- }
let stats = client
.upload_stream(archive_name, stream, upload_options)
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (12 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
` (21 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
This patch is in preparation for being able to quickly lookup
metadata for previous snapshots, by splitting the upload of
a pxar archive into two dedicated streams, one for metadata,
being assigned a .pxar.meta.didx suffix and one for payload
data, being assigned a .pxar.pld.didx suffix.
The patch constructs all the required duplicate chunk stream,
backup writer and upload stream instances required for the
split archive uploads.
This not only makes it possible reuse the payload chunks for
further backup runs but keeps the metadata archive small,
with the outlook of even making the currently used catalog
obsolete.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 4 +
pbs-client/src/pxar_backup_stream.rs | 55 ++++++++++----
proxmox-backup-client/src/main.rs | 75 +++++++++++++++++--
.../src/proxmox_restore_daemon/api.rs | 12 ++-
pxar-bin/src/main.rs | 1 +
tests/catar.rs | 1 +
6 files changed, 123 insertions(+), 25 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index de8c0696..59aa4450 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -141,6 +141,7 @@ pub async fn create_archive<T, F>(
feature_flags: Flags,
callback: F,
catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
+ mut payload_writer: Option<T>,
options: PxarCreateOptions,
) -> Result<(), Error>
where
@@ -171,6 +172,9 @@ where
}
let mut encoder = Encoder::new(&mut writer, &metadata).await?;
+ if let Some(writer) = payload_writer.as_mut() {
+ encoder = encoder.attach_payload_output(writer);
+ }
let mut patterns = options.patterns;
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 22a6ffdc..c7be4a66 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -40,20 +40,31 @@ impl PxarBackupStream {
dir: Dir,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
- ) -> Result<Self, Error> {
- let (tx, rx) = std::sync::mpsc::sync_channel(10);
-
+ separate_payload_stream: bool,
+ ) -> Result<(Self, Option<Self>), Error> {
let buffer_size = 256 * 1024;
- let error = Arc::new(Mutex::new(None));
- let error2 = Arc::clone(&error);
- let handler = async move {
- let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+ let (tx, rx) = std::sync::mpsc::sync_channel(10);
+ let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+ buffer_size,
+ StdChannelWriter::new(tx),
+ ));
+ let writer = pxar::encoder::sync::StandardWriter::new(writer);
+
+ let (payload_tx, payload_rx) = std::sync::mpsc::sync_channel(10);
+ let payload_writer = if separate_payload_stream {
+ let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
buffer_size,
- StdChannelWriter::new(tx),
+ StdChannelWriter::new(payload_tx),
));
+ Some(pxar::encoder::sync::StandardWriter::new(payload_writer))
+ } else {
+ None
+ };
- let writer = pxar::encoder::sync::StandardWriter::new(writer);
+ let error = Arc::new(Mutex::new(None));
+ let error2 = Arc::clone(&error);
+ let handler = async move {
if let Err(err) = crate::pxar::create_archive(
dir,
writer,
@@ -63,6 +74,7 @@ impl PxarBackupStream {
Ok(())
},
Some(catalog),
+ payload_writer,
options,
)
.await
@@ -76,21 +88,34 @@ impl PxarBackupStream {
let future = Abortable::new(handler, registration);
tokio::spawn(future);
- Ok(Self {
+ let backup_stream = Self {
rx: Some(rx),
- handle: Some(handle),
- error,
- })
+ handle: Some(handle.clone()),
+ error: error.clone(),
+ };
+
+ let backup_stream_payload = if separate_payload_stream {
+ Some(Self {
+ rx: Some(payload_rx),
+ handle: Some(handle),
+ error,
+ })
+ } else {
+ None
+ };
+
+ Ok((backup_stream, backup_stream_payload))
}
pub fn open<W: Write + Send + 'static>(
dirname: &Path,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
- ) -> Result<Self, Error> {
+ separate_payload_stream: bool,
+ ) -> Result<(Self, Option<Self>), Error> {
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
- Self::new(dir, catalog, options)
+ Self::new(dir, catalog, options, separate_payload_stream)
}
}
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 256080be..f252f5b7 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -187,17 +187,24 @@ async fn backup_directory<P: AsRef<Path>>(
client: &BackupWriter,
dir_path: P,
archive_name: &str,
+ payload_target: Option<&str>,
chunk_size: Option<usize>,
catalog: Arc<Mutex<CatalogWriter<TokioWriterAdapter<StdChannelWriter<Error>>>>>,
pxar_create_options: pbs_client::pxar::PxarCreateOptions,
upload_options: UploadOptions,
-) -> Result<BackupStats, Error> {
- let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
- let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+) -> Result<(BackupStats, Option<BackupStats>), Error> {
if upload_options.fixed_size.is_some() {
bail!("cannot backup directory with fixed chunk size!");
}
+ let (pxar_stream, payload_stream) = PxarBackupStream::open(
+ dir_path.as_ref(),
+ catalog,
+ pxar_create_options,
+ payload_target.is_some(),
+ )?;
+
+ let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -209,12 +216,46 @@ async fn backup_directory<P: AsRef<Path>>(
}
});
+ let stats = client.upload_stream(archive_name, stream, upload_options.clone());
- let stats = client
- .upload_stream(archive_name, stream, upload_options)
- .await?;
+ if let Some(payload_stream) = payload_stream {
+ let payload_target = if let Some(payload_target) = payload_target {
+ payload_target
+ } else {
+ bail!("got payload stream, but no target archive name");
+ };
- Ok(stats)
+ let mut payload_chunk_stream = ChunkStream::new(
+ payload_stream,
+ chunk_size,
+ );
+ let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
+ let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
+
+ // spawn payload chunker inside a separate task so that it can run parallel
+ tokio::spawn(async move {
+ while let Some(v) = payload_chunk_stream.next().await {
+ let _ = payload_tx.send(v).await;
+ }
+ });
+
+ let payload_stats = client.upload_stream(
+ &payload_target,
+ stream,
+ upload_options,
+ );
+
+ match futures::join!(stats, payload_stats) {
+ (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
+ (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")),
+ (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")),
+ (Err(err), Err(payload_err)) => {
+ Err(format_err!("upload failed: {err} - {payload_err}"))
+ }
+ }
+ } else {
+ Ok((stats.await?, None))
+ }
}
async fn backup_image<P: AsRef<Path>>(
@@ -985,6 +1026,13 @@ async fn create_backup(
manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
}
(BackupSpecificationType::PXAR, false) => {
+ let metadata_mode = false; // Until enabled via param
+ let target = if metadata_mode {
+ format!("{target_base}.meta.didx")
+ } else {
+ target
+ };
+
// start catalog upload on first use
if catalog.is_none() {
let catalog_upload_res =
@@ -1015,16 +1063,27 @@ async fn create_backup(
..UploadOptions::default()
};
- let stats = backup_directory(
+ let payload_target = format!("{target_base}.pld.{extension}");
+ let (stats, payload_stats) = backup_directory(
&client,
&filename,
&target,
+ Some(&payload_target),
chunk_size_opt,
catalog.clone(),
pxar_options,
upload_options,
)
.await?;
+
+ if let Some(payload_stats) = payload_stats {
+ manifest.add_file(
+ payload_target,
+ payload_stats.size,
+ payload_stats.csum,
+ crypto.mode,
+ )?;
+ }
manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
catalog.lock().unwrap().end_directory()?;
}
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index c2055222..bd8ddb20 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -356,8 +356,16 @@ fn extract(
};
let pxar_writer = TokioWriter::new(writer);
- create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options)
- .await
+ create_archive(
+ dir,
+ pxar_writer,
+ Flags::DEFAULT,
+ |_| Ok(()),
+ None,
+ None,
+ options,
+ )
+ .await
}
.await;
if let Err(err) = result {
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 2bbe90e3..e3b0faac 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -383,6 +383,7 @@ async fn create_archive(
Ok(())
},
None,
+ None,
options,
)
.await?;
diff --git a/tests/catar.rs b/tests/catar.rs
index 36bb4f3b..04af4ffd 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
Flags::DEFAULT,
|_| Ok(()),
None,
+ None,
options,
))?;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (13 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
` (20 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Whenever a split pxar archive is encountered, instantiate and attach
the required dedicated reader instance to the decoder instance on
restore.
Piping the output to stdout is not possible, this would require a
decoder instance which can decode the input stream, while maintaining
the pxar stream format as output.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/main.rs | 34 ++++++++++++++++++++++++++++---
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index f252f5b7..c8ba67b4 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1214,7 +1214,7 @@ async fn dump_image<W: Write>(
fn parse_archive_type(name: &str) -> (String, ArchiveType) {
if name.ends_with(".didx") || name.ends_with(".fidx") || name.ends_with(".blob") {
(name.into(), archive_type(name).unwrap())
- } else if name.ends_with(".pxar") {
+ } else if name.ends_with(".pxar") || name.ends_with(".pxar.meta") {
(format!("{}.didx", name), ArchiveType::DynamicIndex)
} else if name.ends_with(".img") {
(format!("{}.fidx", name), ArchiveType::FixedIndex)
@@ -1456,7 +1456,7 @@ async fn restore(
let chunk_reader = RemoteChunkReader::new(
client.clone(),
- crypt_config,
+ crypt_config.clone(),
file_info.chunk_crypt_mode(),
most_used,
);
@@ -1516,8 +1516,33 @@ async fn restore(
}
if let Some(target) = target {
+ let mut decoder = pxar::decoder::Decoder::from_std(reader)?;
+ if let Some(archive_base_name) = archive_name.strip_suffix(".pxar.meta.didx") {
+ let payload_archive_name = format!("{archive_base_name}.pxar.pld.didx");
+
+ let payload_index = client
+ .download_dynamic_index(&manifest, &payload_archive_name)
+ .await?;
+
+ let payload_most_used = payload_index.find_most_used_chunks(8);
+
+ let payload_chunk_reader = RemoteChunkReader::new(
+ client.clone(),
+ crypt_config,
+ file_info.chunk_crypt_mode(),
+ payload_most_used,
+ );
+
+ let payload_reader =
+ BufferedDynamicReader::new(payload_index, payload_chunk_reader);
+
+ decoder = decoder.redirect_payload_input(pxar::decoder::sync::StandardReader::new(
+ payload_reader,
+ ));
+ }
+
pbs_client::pxar::extract_archive(
- pxar::decoder::Decoder::from_std(reader)?,
+ decoder,
Path::new(target),
feature_flags,
|path| {
@@ -1527,6 +1552,9 @@ async fn restore(
)
.map_err(|err| format_err!("error extracting archive - {:#}", err))?;
} else {
+ if archive_name.ends_with(".pxar.meta.didx") {
+ bail!("unable to pipe pxar meta archive");
+ }
let mut writer = std::fs::OpenOptions::new()
.write(true)
.open("/dev/stdout")
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (14 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
` (19 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/tools/mod.rs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pbs-client/src/tools/mod.rs b/pbs-client/src/tools/mod.rs
index 1b0123a3..08986fc5 100644
--- a/pbs-client/src/tools/mod.rs
+++ b/pbs-client/src/tools/mod.rs
@@ -337,7 +337,7 @@ pub fn complete_pxar_archive_name(arg: &str, param: &HashMap<String, String>) ->
complete_server_file_name(arg, param)
.iter()
.filter_map(|name| {
- if name.ends_with(".pxar.didx") {
+ if name.ends_with(".pxar.didx") || name.ends_with(".pxar.meta.didx") {
Some(pbs_tools::format::strip_server_file_extension(name).to_owned())
} else {
None
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 17/36] restore: cover meta extension for pxar archives
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (15 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
` (18 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-file-restore/src/main.rs | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 50875a63..9ca6029a 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -75,7 +75,7 @@ fn parse_path(path: String, base64: bool) -> Result<ExtractPath, Error> {
(file, path)
};
- if file.ends_with(".pxar.didx") {
+ if file.ends_with(".pxar.didx") || file.ends_with(".pxar.meta.didx") {
Ok(ExtractPath::Pxar(file, path))
} else if file.ends_with(".img.fidx") {
Ok(ExtractPath::VM(file, path))
@@ -123,11 +123,16 @@ async fn list_files(
ExtractPath::ListArchives => {
let mut entries = vec![];
for file in manifest.files() {
- if !file.filename.ends_with(".pxar.didx") && !file.filename.ends_with(".img.fidx") {
+ if !file.filename.ends_with(".pxar.didx")
+ && !file.filename.ends_with(".img.fidx")
+ && !file.filename.ends_with(".pxar.meta.didx")
+ {
continue;
}
let path = format!("/{}", file.filename);
- let attr = if file.filename.ends_with(".pxar.didx") {
+ let attr = if file.filename.ends_with(".pxar.didx")
+ || file.filename.ends_with(".pxar.meta.didx")
+ {
// a pxar file is a file archive, so it's root is also a directory root
Some(&DirEntryAttribute::Directory { start: 0 })
} else {
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (16 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
` (17 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Cover the cases where the pxar archive was uploaded as split payload
data and metadata streams. Instantiate the required reader and
decoder instances to access the metadata and payload data archives.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/mount.rs | 56 ++++++++++++++++++++++--------
1 file changed, 41 insertions(+), 15 deletions(-)
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 4a2f8335..c955a7da 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -219,19 +219,22 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
}
};
- let server_archive_name = if archive_name.ends_with(".pxar") {
- if target.is_none() {
- bail!("use the 'mount' command to mount pxar archives");
- }
- format!("{}.didx", archive_name)
- } else if archive_name.ends_with(".img") {
- if target.is_some() {
- bail!("use the 'map' command to map drive images");
- }
- format!("{}.fidx", archive_name)
- } else {
- bail!("Can only mount/map pxar archives and drive images.");
- };
+ let server_archive_name =
+ if archive_name.ends_with(".pxar") || archive_name.ends_with(".pxar.meta") {
+ if target.is_none() {
+ bail!("use the 'mount' command to mount pxar archives");
+ }
+ format!("{}.didx", archive_name)
+ } else if archive_name.ends_with(".img") {
+ if target.is_some() {
+ bail!("use the 'map' command to map drive images");
+ }
+ format!("{}.fidx", archive_name)
+ } else if archive_name.ends_with(".pxar.pld") {
+ bail!("Use corresponding pxar.meta archive to mount.");
+ } else {
+ bail!("Can only mount/map pxar archives and drive images.");
+ };
let client = BackupReader::start(
&client,
@@ -289,14 +292,37 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
let most_used = index.find_most_used_chunks(8);
let chunk_reader = RemoteChunkReader::new(
client.clone(),
- crypt_config,
+ crypt_config.clone(),
file_info.chunk_crypt_mode(),
most_used,
);
let reader = BufferedDynamicReader::new(index, chunk_reader);
+
let archive_size = reader.archive_size();
let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
- let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
+
+ let mut decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
+
+ if let Some(archive_base_name) = server_archive_name.strip_suffix(".pxar.meta.didx") {
+ let payload_archive_name = format!("{archive_base_name}.pxar.pld.didx");
+ let payload_index = client
+ .download_dynamic_index(&manifest, &payload_archive_name)
+ .await?;
+
+ let payload_most_used = payload_index.find_most_used_chunks(8);
+ let payload_chunk_reader = RemoteChunkReader::new(
+ client.clone(),
+ crypt_config,
+ file_info.chunk_crypt_mode(),
+ payload_most_used,
+ );
+
+ let payload_reader = BufferedDynamicReader::new(payload_index, payload_chunk_reader);
+ let payload_reader: pbs_pxar_fuse::Reader =
+ Arc::new(BufferedDynamicReadAt::new(payload_reader));
+
+ decoder = decoder.redirect_payload_input(payload_reader);
+ }
let session =
pbs_pxar_fuse::Session::mount(decoder, options, false, Path::new(target.unwrap()))
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (17 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
` (16 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Move the code to get the local chunk reader to a dedicated function
to make it reusable. The same code is required to get the local chunk
reader for the payload stream for split stream archives.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/admin/datastore.rs | 37 +++++++++++++++++++++++++------------
1 file changed, 25 insertions(+), 12 deletions(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index a95031e7..843b9ce5 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -1653,6 +1653,29 @@ pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
&Permission::Anybody,
);
+fn get_local_pxar_reader(
+ datastore: Arc<DataStore>,
+ manifest: &BackupManifest,
+ backup_dir: &BackupDir,
+ pxar_name: &str,
+) -> Result<(LocalDynamicReadAt<LocalChunkReader>, u64), Error> {
+ let mut path = datastore.base_path();
+ path.push(backup_dir.relative_path());
+ path.push(pxar_name);
+
+ let index = DynamicIndexReader::open(&path)
+ .map_err(|err| format_err!("unable to read dynamic index '{:?}' - {}", &path, err))?;
+
+ let (csum, size) = index.compute_csum();
+ manifest.verify_file(pxar_name, &csum, size)?;
+
+ let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
+ let reader = BufferedDynamicReader::new(index, chunk_reader);
+ let archive_size = reader.archive_size();
+
+ Ok((LocalDynamicReadAt::new(reader), archive_size))
+}
+
pub fn pxar_file_download(
_parts: Parts,
_req_body: Body,
@@ -1697,20 +1720,10 @@ pub fn pxar_file_download(
}
}
- let mut path = datastore.base_path();
- path.push(backup_dir.relative_path());
- path.push(pxar_name);
+ let (reader, archive_size) =
+ get_local_pxar_reader(datastore.clone(), &manifest, &backup_dir, &pxar_name)?;
- let index = DynamicIndexReader::open(&path)
- .map_err(|err| format_err!("unable to read dynamic index '{:?}' - {}", &path, err))?;
- let (csum, size) = index.compute_csum();
- manifest.verify_file(pxar_name, &csum, size)?;
-
- let chunk_reader = LocalChunkReader::new(datastore, None, CryptMode::None);
- let reader = BufferedDynamicReader::new(index, chunk_reader);
- let archive_size = reader.archive_size();
- let reader = LocalDynamicReadAt::new(reader);
let decoder = Accessor::new(reader, archive_size).await?;
let root = decoder.open_root().await?;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload chunk reader
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (18 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
` (15 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Attach the payload chunk reader for pxar archives which have been
uploaded using split streams for metadata and payload data.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/admin/datastore.rs | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 843b9ce5..da10d543 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -1723,9 +1723,15 @@ pub fn pxar_file_download(
let (reader, archive_size) =
get_local_pxar_reader(datastore.clone(), &manifest, &backup_dir, &pxar_name)?;
+ let mut decoder = Accessor::new(reader, archive_size).await?;
+ if let Some(archive_base_name) = pxar_name.strip_suffix(".pxar.meta.didx") {
+ let payload_archive_name = format!("{archive_base_name}.pxar.pld.didx");
+ let (payload_reader, _) =
+ get_local_pxar_reader(datastore, &manifest, &backup_dir, &payload_archive_name)?;
+ decoder = decoder.redirect_payload_input(payload_reader);
+ }
- let decoder = Accessor::new(reader, archive_size).await?;
let root = decoder.open_root().await?;
let path = OsStr::from_bytes(file_path).to_os_string();
let file = root
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (19 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
` (14 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
In preparation to allow restoring split metadata and payload stream
pxar archives via the catalog shell.
Make the pxar fuse reader instantiation reusable.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/catalog.rs | 45 ++++++++++++++++++++--------
1 file changed, 32 insertions(+), 13 deletions(-)
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index 72b22e67..d6859be3 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -11,6 +11,7 @@ use proxmox_schema::api;
use pbs_api_types::BackupNamespace;
use pbs_client::tools::key_source::get_encryption_key_password;
use pbs_client::{BackupReader, RemoteChunkReader};
+use pbs_datastore::BackupManifest;
use pbs_tools::crypt_config::CryptConfig;
use pbs_tools::json::required_string_param;
@@ -124,6 +125,32 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
Ok(Value::Null)
}
+async fn get_pxar_fuse_reader(
+ server_archive_name: &str,
+ client: Arc<BackupReader>,
+ manifest: &BackupManifest,
+ crypt_config: Option<Arc<CryptConfig>>,
+) -> Result<(pbs_pxar_fuse::Reader, u64), Error> {
+ let index = client
+ .download_dynamic_index(&manifest, &server_archive_name)
+ .await?;
+
+ let most_used = index.find_most_used_chunks(8);
+ let file_info = manifest.lookup_file_info(&server_archive_name)?;
+ let chunk_reader = RemoteChunkReader::new(
+ client.clone(),
+ crypt_config.clone(),
+ file_info.chunk_crypt_mode(),
+ most_used,
+ );
+
+ let reader = BufferedDynamicReader::new(index, chunk_reader);
+ let archive_size = reader.archive_size();
+ let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
+
+ Ok((reader, archive_size))
+}
+
#[api(
input: {
properties: {
@@ -205,21 +232,13 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
let (manifest, _) = client.download_manifest().await?;
manifest.check_fingerprint(crypt_config.as_ref().map(Arc::as_ref))?;
- let index = client
- .download_dynamic_index(&manifest, &server_archive_name)
- .await?;
- let most_used = index.find_most_used_chunks(8);
-
- let file_info = manifest.lookup_file_info(&server_archive_name)?;
- let chunk_reader = RemoteChunkReader::new(
+ let (reader, archive_size) = get_pxar_fuse_reader(
+ &server_archive_name,
client.clone(),
+ &manifest,
crypt_config.clone(),
- file_info.chunk_crypt_mode(),
- most_used,
- );
- let reader = BufferedDynamicReader::new(index, chunk_reader);
- let archive_size = reader.archive_size();
- let reader: pbs_pxar_fuse::Reader = Arc::new(BufferedDynamicReadAt::new(reader));
+ ).await?;
+
let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
client.download(CATALOG_NAME, &mut tmpfile).await?;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (20 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
` (13 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Allows to attach to pxar archives with split metadata and payload
streams, by redirecting the payload input to a dedicated reader
accessing the payload index.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
proxmox-backup-client/src/catalog.rs | 26 ++++++++++++++++++++------
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index d6859be3..291c4184 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -207,11 +207,14 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
}
};
- let server_archive_name = if archive_name.ends_with(".pxar") {
- format!("{}.didx", archive_name)
- } else {
- bail!("Can only mount pxar archives.");
- };
+ let server_archive_name =
+ if archive_name.ends_with(".pxar") || archive_name.ends_with(".pxar.meta") {
+ format!("{}.didx", archive_name)
+ } else if archive_name.ends_with(".pxar.pld") {
+ bail!("Cannot mount pxar pld, use pxar meta instead");
+ } else {
+ bail!("Can only mount pxar archives.");
+ };
let client = BackupReader::start(
&client,
@@ -239,7 +242,18 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
crypt_config.clone(),
).await?;
- let decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
+ let mut decoder = pbs_pxar_fuse::Accessor::new(reader, archive_size).await?;
+
+ if let Some(archive_base_name) = server_archive_name.strip_suffix(".pxar.meta.didx") {
+ let payload_archive_name = format!("{archive_base_name},pxar.pld.didx");
+ let (payload_reader, _) = get_pxar_fuse_reader(
+ &payload_archive_name,
+ client.clone(),
+ &manifest,
+ crypt_config.clone(),
+ ).await?;
+ decoder = decoder.redirect_payload_input(payload_reader);
+ }
client.download(CATALOG_NAME, &mut tmpfile).await?;
let index = DynamicIndexReader::new(tmpfile)
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (21 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
` (12 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Allows to access the pxar meta archives for navigation and download
via the Proxmox Backup Server web ui.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/datastore/Content.js | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/www/datastore/Content.js b/www/datastore/Content.js
index 87317ec1..966d7255 100644
--- a/www/datastore/Content.js
+++ b/www/datastore/Content.js
@@ -1043,7 +1043,7 @@ Ext.define('PBS.DataStoreContent', {
tooltip: gettext('Browse'),
getClass: (v, m, { data }) => {
if (
- (data.ty === 'file' && data.filename.endsWith('pxar.didx')) ||
+ (data.ty === 'file' && (data.filename.endsWith('pxar.didx') || data.filename.endsWith('pxar.meta.didx'))) ||
(data.ty === 'ns' && !data.root)
) {
return 'fa fa-folder-open-o';
@@ -1051,7 +1051,9 @@ Ext.define('PBS.DataStoreContent', {
return 'pmx-hidden';
},
isActionDisabled: (v, r, c, i, { data }) =>
- !(data.ty === 'file' && data.filename.endsWith('pxar.didx') && data['crypt-mode'] < 3) && data.ty !== 'ns',
+ !(data.ty === 'file' &&
+ (data.filename.endsWith('pxar.didx') || data.filename.endsWith('pxar.meta.didx')) &&
+ data['crypt-mode'] < 3) && data.ty !== 'ns',
},
],
},
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (22 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
` (11 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
In preparation for injecting reused payload chunks in payload streams
for regular files with unchanged metaddata.
Adds a function to get a list of DynamicEntry's from a chunk index by
given start and end offset, the range of which will be contained
within these returned chunks.
In addition to the list of index entries, the padding to the start of
the requested start offset from the first chunk is returned, as well
as the end padding following the requested payload to the actual
chunk end.
The padding is used for calculation of the payload reference offset
written to the metadata archive, required for payload access during
decoding.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-datastore/src/dynamic_index.rs | 55 ++++++++++++++++++++++++++++++
1 file changed, 55 insertions(+)
diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs
index 71a5082e..847bc80d 100644
--- a/pbs-datastore/src/dynamic_index.rs
+++ b/pbs-datastore/src/dynamic_index.rs
@@ -74,6 +74,26 @@ impl DynamicEntry {
}
}
+/// Dynamic Entry appendable as pxar Appendix entry
+#[derive(Clone, Debug)]
+#[repr(C)]
+pub struct AppendableDynamicEntry {
+ size_le: u64,
+ digest: [u8; 32],
+}
+
+impl AppendableDynamicEntry {
+ #[inline]
+ pub fn size(&self) -> u64 {
+ u64::from_le(self.size_le)
+ }
+
+ #[inline]
+ pub fn digest(&self) -> [u8; 32] {
+ self.digest.clone()
+ }
+}
+
pub struct DynamicIndexReader {
_file: File,
pub size: usize,
@@ -188,6 +208,41 @@ impl DynamicIndexReader {
self.binary_search(middle_idx + 1, middle_end, end_idx, end, offset)
}
}
+
+ /// List of chunk indices containing the data from start_offset to end_offset
+ pub fn indices(
+ &self,
+ start_offset: u64,
+ end_offset: u64,
+ ) -> Result<(Vec<AppendableDynamicEntry>, u64, u64), Error> {
+ let end_idx = self.index.len() - 1;
+ let chunk_end = self.chunk_end(end_idx);
+ let start = self.binary_search(0, 0, end_idx, chunk_end, start_offset)?;
+ let end = self.binary_search(0, 0, end_idx, chunk_end, end_offset - 1)?;
+
+ let offset_first = if start == 0 {
+ 0
+ } else {
+ self.index[start - 1].end()
+ };
+
+ let padding_start = start_offset - offset_first;
+ let padding_end = self.index[end].end() - end_offset;
+
+ let mut indices = Vec::new();
+ let mut prev_end = offset_first;
+ for dynamic_entry in &self.index[start..end + 1] {
+ let size = dynamic_entry.end() - prev_end;
+ let appendable_dynamic_entry = AppendableDynamicEntry {
+ size_le: size.to_le(),
+ digest: dynamic_entry.digest.clone(),
+ };
+ prev_end += size;
+ indices.push(appendable_dynamic_entry);
+ }
+
+ Ok((indices, padding_start, padding_end))
+ }
}
impl IndexFile for DynamicIndexReader {
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (23 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
` (10 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream.
The chunker forces a chunk boundary and queues the list of chunks to
be uploaded thereafter.
This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
pbs-client/src/lib.rs | 1 +
2 files changed, 153 insertions(+)
create mode 100644 pbs-client/src/inject_reused_chunks.rs
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 00000000..7c0f7780
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,152 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use pbs_datastore::dynamic_index::AppendableDynamicEntry;
+
+pin_project! {
+ pub struct InjectReusedChunksQueue<S> {
+ #[pin]
+ input: S,
+ current: Option<InjectChunks>,
+ buffer: Option<bytes::BytesMut>,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+ pub boundary: u64,
+ pub chunks: Vec<AppendableDynamicEntry>,
+ pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+ Known(Vec<(u64, [u8; 32])>),
+ Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self> {
+ InjectReusedChunksQueue {
+ input: self,
+ current: None,
+ injection_queue,
+ buffer: None,
+ stream_len,
+ reused_len,
+ index_csum,
+ }
+ }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ type Item = Result<InjectedChunksInfo, Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ loop {
+ let current = this.current.take();
+ if let Some(current) = current {
+ let mut chunks = Vec::new();
+ let mut guard = this.index_csum.lock().unwrap();
+ let csum = guard.as_mut().unwrap();
+
+ for chunk in current.chunks {
+ let offset = this
+ .stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+ as u64;
+ this.reused_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ let digest = chunk.digest();
+ chunks.push((offset, digest));
+ let end_offset = offset + chunk.size();
+ csum.update(&end_offset.to_le_bytes());
+ csum.update(&digest);
+ }
+ let chunk_info = InjectedChunksInfo::Known(chunks);
+ return Poll::Ready(Some(Ok(chunk_info)));
+ }
+
+ let buffer = this.buffer.take();
+ if let Some(buffer) = buffer {
+ let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, buffer));
+ return Poll::Ready(Some(Ok(data)));
+ }
+
+ match ready!(this.input.as_mut().poll_next(cx)) {
+ None => return Poll::Ready(None),
+ Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+ Some(Ok(raw)) => {
+ let chunk_size = raw.len();
+ let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+ let mut injections = this.injection_queue.lock().unwrap();
+
+ if let Some(inject) = injections.pop_front() {
+ if inject.boundary == offset {
+ if this.current.replace(inject).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced injection queue not empty"
+ ))));
+ }
+ if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced buffer not empty"
+ ))));
+ }
+ continue;
+ } else if inject.boundary == offset + chunk_size as u64 {
+ let _ = this.current.insert(inject);
+ } else if inject.boundary < offset + chunk_size as u64 {
+ return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+ } else {
+ injections.push_front(inject);
+ }
+ }
+
+ if chunk_size == 0 {
+ return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+ }
+
+ let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, raw));
+
+ return Poll::Ready(Some(Ok(data)));
+ }
+ }
+ }
+ }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556..3e7bd2a8 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
pub mod pxar;
pub mod tools;
+mod inject_reused_chunks;
mod merge_known_chunks;
pub mod pipe_to_stream;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (24 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
` (9 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Adds a queue to the chunk stream to request forced boundaries at a
given offset within the stream and inject reused chunks after this
boundary.
The chunks are then passed along to the uploader stream using the
injection queue, which inserts them during upload.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
examples/test_chunk_speed2.rs | 10 ++-
pbs-client/src/backup_writer.rs | 89 +++++++++++--------
pbs-client/src/chunk_stream.rs | 42 ++++++++-
pbs-client/src/pxar/create.rs | 6 +-
pbs-client/src/pxar_backup_stream.rs | 8 +-
proxmox-backup-client/src/main.rs | 28 ++++--
.../src/proxmox_restore_daemon/api.rs | 3 +
pxar-bin/src/main.rs | 5 +-
tests/catar.rs | 3 +
9 files changed, 147 insertions(+), 47 deletions(-)
diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
index 3f69b436..b20a5b59 100644
--- a/examples/test_chunk_speed2.rs
+++ b/examples/test_chunk_speed2.rs
@@ -1,3 +1,6 @@
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
+
use anyhow::Error;
use futures::*;
@@ -26,7 +29,12 @@ async fn run() -> Result<(), Error> {
.map_err(Error::from);
//let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
- let mut chunk_stream = ChunkStream::new(stream, None);
+ let mut chunk_stream = ChunkStream::new(
+ stream,
+ None,
+ Arc::new(Mutex::new(VecDeque::new())),
+ Arc::new(Mutex::new(VecDeque::new())),
+ );
let start_time = std::time::Instant::now();
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 8a03d8ea..e66b93df 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
use std::future::Future;
use std::os::unix::fs::OpenOptionsExt;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
@@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
use proxmox_human_byte::HumanByte;
+use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
use super::{H2Client, HttpClient};
@@ -265,6 +266,7 @@ impl BackupWriter {
archive_name: &str,
stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
options: UploadOptions,
+ injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
) -> Result<BackupStats, Error> {
let known_chunks = Arc::new(Mutex::new(HashSet::new()));
@@ -341,6 +343,7 @@ impl BackupWriter {
None
},
options.compress,
+ injection_queue,
)
.await?;
@@ -637,6 +640,7 @@ impl BackupWriter {
known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
crypt_config: Option<Arc<CryptConfig>>,
compress: bool,
+ injection_queue: Option<Arc<Mutex<VecDeque<InjectChunks>>>>,
) -> impl Future<Output = Result<UploadStats, Error>> {
let total_chunks = Arc::new(AtomicUsize::new(0));
let total_chunks2 = total_chunks.clone();
@@ -663,48 +667,63 @@ impl BackupWriter {
let index_csum_2 = index_csum.clone();
stream
- .and_then(move |data| {
- let chunk_len = data.len();
+ .inject_reused_chunks(
+ injection_queue.unwrap_or_default(),
+ stream_len,
+ reused_len.clone(),
+ index_csum.clone(),
+ )
+ .and_then(move |chunk_info| match chunk_info {
+ InjectedChunksInfo::Known(chunks) => {
+ total_chunks.fetch_add(chunks.len(), Ordering::SeqCst);
+ future::ok(MergedChunkInfo::Known(chunks))
+ }
+ InjectedChunksInfo::Raw((offset, data)) => {
+ let chunk_len = data.len();
- total_chunks.fetch_add(1, Ordering::SeqCst);
- let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
+ total_chunks.fetch_add(1, Ordering::SeqCst);
- let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
+ let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
- if let Some(ref crypt_config) = crypt_config {
- chunk_builder = chunk_builder.crypt_config(crypt_config);
- }
+ if let Some(ref crypt_config) = crypt_config {
+ chunk_builder = chunk_builder.crypt_config(crypt_config);
+ }
- let mut known_chunks = known_chunks.lock().unwrap();
- let digest = chunk_builder.digest();
+ let mut known_chunks = known_chunks.lock().unwrap();
- let mut guard = index_csum.lock().unwrap();
- let csum = guard.as_mut().unwrap();
+ let digest = chunk_builder.digest();
- let chunk_end = offset + chunk_len as u64;
+ let mut guard = index_csum.lock().unwrap();
+ let csum = guard.as_mut().unwrap();
- if !is_fixed_chunk_size {
- csum.update(&chunk_end.to_le_bytes());
- }
- csum.update(digest);
+ let chunk_end = offset + chunk_len as u64;
- let chunk_is_known = known_chunks.contains(digest);
- if chunk_is_known {
- known_chunk_count.fetch_add(1, Ordering::SeqCst);
- reused_len.fetch_add(chunk_len, Ordering::SeqCst);
- future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
- } else {
- let compressed_stream_len2 = compressed_stream_len.clone();
- known_chunks.insert(*digest);
- future::ready(chunk_builder.build().map(move |(chunk, digest)| {
- compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
- MergedChunkInfo::New(ChunkInfo {
- chunk,
- digest,
- chunk_len: chunk_len as u64,
- offset,
- })
- }))
+ if !is_fixed_chunk_size {
+ csum.update(&chunk_end.to_le_bytes());
+ }
+ csum.update(digest);
+
+ let chunk_is_known = known_chunks.contains(digest);
+ if chunk_is_known {
+ known_chunk_count.fetch_add(1, Ordering::SeqCst);
+ reused_len.fetch_add(chunk_len, Ordering::SeqCst);
+
+ future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
+ } else {
+ let compressed_stream_len2 = compressed_stream_len.clone();
+ known_chunks.insert(*digest);
+
+ future::ready(chunk_builder.build().map(move |(chunk, digest)| {
+ compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
+
+ MergedChunkInfo::New(ChunkInfo {
+ chunk,
+ digest,
+ chunk_len: chunk_len as u64,
+ offset,
+ })
+ }))
+ }
}
})
.merge_known_chunks()
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index 895f6eae..891d6928 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -1,4 +1,6 @@
+use std::collections::VecDeque;
use std::pin::Pin;
+use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use anyhow::Error;
@@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream};
use pbs_datastore::Chunker;
+use crate::inject_reused_chunks::InjectChunks;
+
/// Split input stream into dynamic sized chunks
pub struct ChunkStream<S: Unpin> {
input: S,
chunker: Chunker,
buffer: BytesMut,
scan_pos: usize,
+ consumed: u64,
+ boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+ injections: Arc<Mutex<VecDeque<InjectChunks>>>,
}
impl<S: Unpin> ChunkStream<S> {
- pub fn new(input: S, chunk_size: Option<usize>) -> Self {
+ pub fn new(
+ input: S,
+ chunk_size: Option<usize>,
+ boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+ injections: Arc<Mutex<VecDeque<InjectChunks>>>,
+ ) -> Self {
Self {
input,
chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
buffer: BytesMut::new(),
scan_pos: 0,
+ consumed: 0,
+ boundaries,
+ injections,
}
}
}
@@ -40,6 +55,29 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
+ {
+ // Make sure to release this lock as soon as possible
+ let mut boundaries = this.boundaries.lock().unwrap();
+ if let Some(inject) = boundaries.pop_front() {
+ let max = this.consumed + this.buffer.len() as u64;
+ if inject.boundary <= max {
+ let chunk_size = (inject.boundary - this.consumed) as usize;
+ let result = this.buffer.split_to(chunk_size);
+ this.consumed += chunk_size as u64;
+ this.scan_pos = 0;
+
+ // Add the size of the injected chunks to consumed, so chunk stream offsets
+ // are in sync with the rest of the archive.
+ this.consumed += inject.size as u64;
+
+ this.injections.lock().unwrap().push_back(inject);
+
+ return Poll::Ready(Some(Ok(result)));
+ }
+ boundaries.push_front(inject);
+ }
+ }
+
if this.scan_pos < this.buffer.len() {
let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
@@ -50,7 +88,9 @@ where
// continue poll
} else if chunk_size <= this.buffer.len() {
let result = this.buffer.split_to(chunk_size);
+ this.consumed += chunk_size as u64;
this.scan_pos = 0;
+
return Poll::Ready(Some(Ok(result)));
} else {
panic!("got unexpected chunk boundary from chunker");
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 59aa4450..9ae84d37 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -1,4 +1,4 @@
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, VecDeque};
use std::ffi::{CStr, CString, OsStr};
use std::fmt;
use std::io::{self, Read};
@@ -26,6 +26,7 @@ use proxmox_sys::fs::{self, acl, xattr};
use pbs_datastore::catalog::BackupCatalogWriter;
+use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
@@ -131,6 +132,7 @@ struct Archiver {
hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
file_copy_buffer: Vec<u8>,
skip_e2big_xattr: bool,
+ forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
}
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -143,6 +145,7 @@ pub async fn create_archive<T, F>(
catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
mut payload_writer: Option<T>,
options: PxarCreateOptions,
+ forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
) -> Result<(), Error>
where
T: SeqWrite + Send,
@@ -201,6 +204,7 @@ where
hardlinks: HashMap::new(),
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
skip_e2big_xattr: options.skip_e2big_xattr,
+ forced_boundaries,
};
archiver
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index c7be4a66..77017779 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -1,3 +1,4 @@
+use std::collections::VecDeque;
use std::io::Write;
//use std::os::unix::io::FromRawFd;
use std::path::Path;
@@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter;
use pbs_datastore::catalog::CatalogWriter;
+use crate::inject_reused_chunks::InjectChunks;
+
/// Stream implementation to encode and upload .pxar archives.
///
/// The hyper client needs an async Stream for file upload, so we
@@ -40,6 +43,7 @@ impl PxarBackupStream {
dir: Dir,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
+ boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
separate_payload_stream: bool,
) -> Result<(Self, Option<Self>), Error> {
let buffer_size = 256 * 1024;
@@ -76,6 +80,7 @@ impl PxarBackupStream {
Some(catalog),
payload_writer,
options,
+ boundaries,
)
.await
{
@@ -111,11 +116,12 @@ impl PxarBackupStream {
dirname: &Path,
catalog: Arc<Mutex<CatalogWriter<W>>>,
options: crate::pxar::PxarCreateOptions,
+ boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
separate_payload_stream: bool,
) -> Result<(Self, Option<Self>), Error> {
let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
- Self::new(dir, catalog, options, separate_payload_stream)
+ Self::new(dir, catalog, options, boundaries, separate_payload_stream)
}
}
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index c8ba67b4..290df4a1 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
@@ -197,14 +197,19 @@ async fn backup_directory<P: AsRef<Path>>(
bail!("cannot backup directory with fixed chunk size!");
}
+ let payload_boundaries = Arc::new(Mutex::new(VecDeque::new()));
let (pxar_stream, payload_stream) = PxarBackupStream::open(
dir_path.as_ref(),
catalog,
pxar_create_options,
+ payload_boundaries.clone(),
payload_target.is_some(),
)?;
- let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+ let dummy_injections = Arc::new(Mutex::new(VecDeque::new()));
+ let dummy_boundaries = Arc::new(Mutex::new(VecDeque::new()));
+ let mut chunk_stream =
+ ChunkStream::new(pxar_stream, chunk_size, dummy_boundaries, dummy_injections);
let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -216,7 +221,7 @@ async fn backup_directory<P: AsRef<Path>>(
}
});
- let stats = client.upload_stream(archive_name, stream, upload_options.clone());
+ let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
if let Some(payload_stream) = payload_stream {
let payload_target = if let Some(payload_target) = payload_target {
@@ -225,9 +230,12 @@ async fn backup_directory<P: AsRef<Path>>(
bail!("got payload stream, but no target archive name");
};
+ let payload_injections = Arc::new(Mutex::new(VecDeque::new()));
let mut payload_chunk_stream = ChunkStream::new(
payload_stream,
chunk_size,
+ payload_boundaries,
+ payload_injections.clone(),
);
let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
@@ -243,6 +251,7 @@ async fn backup_directory<P: AsRef<Path>>(
&payload_target,
stream,
upload_options,
+ Some(payload_injections),
);
match futures::join!(stats, payload_stats) {
@@ -279,7 +288,7 @@ async fn backup_image<P: AsRef<Path>>(
}
let stats = client
- .upload_stream(archive_name, stream, upload_options)
+ .upload_stream(archive_name, stream, upload_options, None)
.await?;
Ok(stats)
@@ -570,7 +579,14 @@ fn spawn_catalog_upload(
let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
let catalog_chunk_size = 512 * 1024;
- let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
+ let boundaries = Arc::new(Mutex::new(VecDeque::new()));
+ let injections = Arc::new(Mutex::new(VecDeque::new()));
+ let catalog_chunk_stream = ChunkStream::new(
+ catalog_stream,
+ Some(catalog_chunk_size),
+ boundaries,
+ injections.clone(),
+ );
let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
StdChannelWriter::new(catalog_tx),
@@ -586,7 +602,7 @@ fn spawn_catalog_upload(
tokio::spawn(async move {
let catalog_upload_result = client
- .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
+ .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
.await;
if let Err(ref err) = catalog_upload_result {
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index bd8ddb20..d912734c 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -1,8 +1,10 @@
///! File-restore API running inside the restore VM
+use std::collections::VecDeque;
use std::ffi::OsStr;
use std::fs;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use anyhow::{bail, Error};
use futures::FutureExt;
@@ -364,6 +366,7 @@ fn extract(
None,
None,
options,
+ Arc::new(Mutex::new(VecDeque::new())),
)
.await
}
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index e3b0faac..74ee04f7 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -1,10 +1,10 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
use std::ffi::OsStr;
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::future::FutureExt;
@@ -385,6 +385,7 @@ async fn create_archive(
None,
None,
options,
+ Arc::new(Mutex::new(VecDeque::new())),
)
.await?;
diff --git a/tests/catar.rs b/tests/catar.rs
index 04af4ffd..6edd747d 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -1,4 +1,6 @@
+use std::collections::VecDeque;
use std::process::Command;
+use std::sync::{Arc, Mutex};
use anyhow::Error;
@@ -41,6 +43,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
None,
None,
options,
+ Arc::new(Mutex::new(VecDeque::new())),
))?;
Command::new("cmp")
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (25 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
` (8 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Implement a method that prepares the decoder instance to access a
previous snapshots metadata index and payload index in order to
pass it to the pxar archiver. The archiver than can utilize these
to compare the metadata for files to the previous state and gather
reusable chunks.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 13 ++++++
pbs-client/src/pxar/mod.rs | 2 +-
proxmox-backup-client/src/main.rs | 71 ++++++++++++++++++++++++++++++-
3 files changed, 83 insertions(+), 3 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 9ae84d37..cb0af29e 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -17,6 +17,7 @@ use nix::sys::stat::{FileStat, Mode};
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use proxmox_sys::error::SysError;
+use pxar::accessor::aio::Accessor;
use pxar::encoder::{LinkOffset, SeqWrite};
use pxar::Metadata;
@@ -24,7 +25,9 @@ use proxmox_io::vec;
use proxmox_lang::c_str;
use proxmox_sys::fs::{self, acl, xattr};
+use crate::RemoteChunkReader;
use pbs_datastore::catalog::BackupCatalogWriter;
+use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::metadata::errno_is_unsupported;
@@ -46,6 +49,16 @@ pub struct PxarCreateOptions {
pub skip_e2big_xattr: bool,
}
+/// Statefull information of previous backups snapshots for partial backups
+pub struct PxarPrevRef {
+ /// Reference accessor for metadata comparison
+ pub accessor: Accessor<LocalDynamicReadAt<RemoteChunkReader>>,
+ /// Reference index for reusing payload chunks
+ pub payload_index: DynamicIndexReader,
+ /// Reference archive name for partial backups
+ pub archive_name: String,
+}
+
fn detect_fs_type(fd: RawFd) -> Result<i64, Error> {
let mut fs_stat = std::mem::MaybeUninit::uninit();
let res = unsafe { libc::fstatfs(fd, fs_stat.as_mut_ptr()) };
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 14674b9b..24315f5f 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -56,7 +56,7 @@ pub(crate) mod tools;
mod flags;
pub use flags::Flags;
-pub use create::{create_archive, PxarCreateOptions};
+pub use create::{create_archive, PxarCreateOptions, PxarPrevRef};
pub use extract::{
create_tar, create_zip, extract_archive, extract_sub_dir, extract_sub_dir_seq, ErrorHandler,
OverwriteFlags, PxarExtractContext, PxarExtractOptions,
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 290df4a1..2d48ca85 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -21,6 +21,7 @@ use proxmox_router::{cli::*, ApiMethod, RpcEnvironment};
use proxmox_schema::api;
use proxmox_sys::fs::{file_get_json, image_size, replace_file, CreateOptions};
use proxmox_time::{epoch_i64, strftime_local};
+use pxar::accessor::aio::Accessor;
use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
use pbs_api_types::{
@@ -30,7 +31,7 @@ use pbs_api_types::{
BACKUP_TYPE_SCHEMA, TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
};
use pbs_client::catalog_shell::Shell;
-use pbs_client::pxar::ErrorHandler as PxarErrorHandler;
+use pbs_client::pxar::{ErrorHandler as PxarErrorHandler, PxarPrevRef};
use pbs_client::tools::{
complete_archive_name, complete_auth_id, complete_backup_group, complete_backup_snapshot,
complete_backup_source, complete_chunk_size, complete_group_or_snapshot,
@@ -50,7 +51,7 @@ use pbs_client::{
};
use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
use pbs_datastore::chunk_store::verify_chunk_size;
-use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader};
+use pbs_datastore::dynamic_index::{BufferedDynamicReader, DynamicIndexReader, LocalDynamicReadAt};
use pbs_datastore::fixed_index::FixedIndexReader;
use pbs_datastore::index::IndexFile;
use pbs_datastore::manifest::{
@@ -1181,6 +1182,72 @@ async fn create_backup(
Ok(Value::Null)
}
+async fn prepare_reference(
+ target_base: &str,
+ extension: &str,
+ manifest: Option<Arc<BackupManifest>>,
+ backup_writer: &BackupWriter,
+ backup_reader: Option<Arc<BackupReader>>,
+ crypt_config: Option<Arc<CryptConfig>>,
+) -> Result<Option<PxarPrevRef>, Error> {
+ let target = format!("{target_base}.meta.{extension}");
+ let payload_target = format!("{target_base}.pld.{extension}");
+
+ let manifest = if let Some(ref manifest) = manifest {
+ manifest
+ } else {
+ return Ok(None);
+ };
+
+ let backup_reader = if let Some(ref reader) = backup_reader {
+ reader
+ } else {
+ return Ok(None);
+ };
+
+ let metadata_ref_index = if let Ok(index) = backup_reader
+ .download_dynamic_index(&manifest, &target)
+ .await
+ {
+ index
+ } else {
+ log::info!("No previous metadata index, fallback to regular encoding");
+ return Ok(None);
+ };
+
+ let known_payload_chunks = Arc::new(Mutex::new(HashSet::new()));
+ let payload_ref_index = if let Ok(index) = backup_writer
+ .download_previous_dynamic_index(&payload_target, &manifest, known_payload_chunks)
+ .await
+ {
+ index
+ } else {
+ log::info!("No previous payload index, fallback to regular encoding");
+ return Ok(None);
+ };
+
+ log::info!("Using previous index as metadata reference for '{target}'");
+
+ let most_used = metadata_ref_index.find_most_used_chunks(8);
+ let file_info = manifest.lookup_file_info(&target)?;
+ let chunk_reader = RemoteChunkReader::new(
+ backup_reader.clone(),
+ crypt_config.clone(),
+ file_info.chunk_crypt_mode(),
+ most_used,
+ );
+ let reader = BufferedDynamicReader::new(metadata_ref_index, chunk_reader);
+ let archive_size = reader.archive_size();
+ let reader = LocalDynamicReadAt::new(reader);
+ let accessor = Accessor::new(reader, archive_size).await?;
+
+ Ok(Some(pbs_client::pxar::PxarPrevRef {
+ accessor,
+ payload_index: payload_ref_index,
+ archive_name: target,
+ }))
+}
+
async fn dump_image<W: Write>(
client: Arc<BackupReader>,
crypt_config: Option<Arc<CryptConfig>>,
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (26 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
` (7 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
In preparation for the look-ahead caching used to temprarily store
entries before encoding them in the pxar archive, being able to
decide wether to re-use or re-encode regular file entries.
Allows to insert and store reused chunks in the archiver,
deduplicating chunks upon insert when possible.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 109 +++++++++++++++++++++++++++++++++-
1 file changed, 107 insertions(+), 2 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index cb0af29e..981bde42 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -18,7 +18,7 @@ use nix::sys::stat::{FileStat, Mode};
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use proxmox_sys::error::SysError;
use pxar::accessor::aio::Accessor;
-use pxar::encoder::{LinkOffset, SeqWrite};
+use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
use pxar::Metadata;
use proxmox_io::vec;
@@ -27,13 +27,116 @@ use proxmox_sys::fs::{self, acl, xattr};
use crate::RemoteChunkReader;
use pbs_datastore::catalog::BackupCatalogWriter;
-use pbs_datastore::dynamic_index::{DynamicIndexReader, LocalDynamicReadAt};
+use pbs_datastore::dynamic_index::{
+ AppendableDynamicEntry, DynamicIndexReader, LocalDynamicReadAt,
+};
use crate::inject_reused_chunks::InjectChunks;
use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
+#[derive(Default)]
+struct ReusedChunks {
+ start_boundary: PayloadOffset,
+ total: PayloadOffset,
+ chunks: Vec<AppendableDynamicEntry>,
+ must_flush_first: bool,
+}
+
+impl ReusedChunks {
+ fn new() -> Self {
+ Self {
+ start_boundary: PayloadOffset::default(),
+ total: PayloadOffset::default(),
+ chunks: Vec::new(),
+ must_flush_first: false,
+ }
+ }
+
+ fn start_boundary(&self) -> PayloadOffset {
+ self.start_boundary
+ }
+
+ fn is_empty(&self) -> bool {
+ self.chunks.is_empty()
+ }
+
+ fn insert(
+ &mut self,
+ indices: Vec<AppendableDynamicEntry>,
+ boundary: PayloadOffset,
+ start_padding: u64,
+ ) -> PayloadOffset {
+ if self.is_empty() {
+ self.start_boundary = boundary;
+ }
+
+ if let Some(offset) = self.digest_sequence_contained(&indices) {
+ self.start_boundary.add(offset + start_padding)
+ } else if let Some(offset) = self.last_digest_matched(&indices) {
+ for chunk in indices.into_iter().skip(1) {
+ self.total = self.total.add(chunk.size());
+ self.chunks.push(chunk);
+ }
+ self.start_boundary.add(offset + start_padding)
+ } else {
+ let offset = self.total.raw();
+ for chunk in indices.into_iter() {
+ self.total = self.total.add(chunk.size());
+ self.chunks.push(chunk);
+ }
+ self.start_boundary.add(offset + start_padding)
+ }
+ }
+
+ fn digest_sequence_contained(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+ let digest = if let Some(first) = indices.first() {
+ first.digest()
+ } else {
+ return None;
+ };
+
+ let mut offset = 0;
+ let mut iter = self.chunks.iter();
+ while let Some(position) = iter.position(|e| {
+ offset += e.size();
+ e.digest() == digest
+ }) {
+ if indices.len() + position > self.chunks.len() {
+ return None;
+ }
+
+ for (ind, chunk) in indices.iter().skip(1).enumerate() {
+ if chunk.digest() != self.chunks[ind + position].digest() {
+ return None;
+ }
+ }
+
+ offset -= self.chunks[position].size();
+ return Some(offset);
+ }
+
+ None
+ }
+
+ fn last_digest_matched(&self, indices: &[AppendableDynamicEntry]) -> Option<u64> {
+ let digest = if let Some(first) = indices.first() {
+ first.digest()
+ } else {
+ return None;
+ };
+
+ if let Some(last) = self.chunks.last() {
+ if last.digest() == digest {
+ return Some(self.total.raw() - last.size());
+ }
+ }
+
+ None
+ }
+}
+
/// Pxar options for creating a pxar archive/stream
#[derive(Default, Clone)]
pub struct PxarCreateOptions {
@@ -145,6 +248,7 @@ struct Archiver {
hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
file_copy_buffer: Vec<u8>,
skip_e2big_xattr: bool,
+ reused_chunks: ReusedChunks,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
}
@@ -217,6 +321,7 @@ where
hardlinks: HashMap::new(),
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
skip_e2big_xattr: options.skip_e2big_xattr,
+ reused_chunks: ReusedChunks::new(),
forced_boundaries,
};
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (27 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
` (6 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Read the previous snaphosts manifest and check if a split archive
with the same name is given. If so, create the accessor instance to
read the previous archive entries to be able to lookup and compare
the metata for the entries, allowing to make a decision if the
entry is reusable or not.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 45 ++++++++++++---
proxmox-backup-client/src/main.rs | 57 +++++++++++++++++--
.../src/proxmox_restore_daemon/api.rs | 1 +
pxar-bin/src/main.rs | 1 +
4 files changed, 92 insertions(+), 12 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 981bde42..6713daf3 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -138,7 +138,7 @@ impl ReusedChunks {
}
/// Pxar options for creating a pxar archive/stream
-#[derive(Default, Clone)]
+#[derive(Default)]
pub struct PxarCreateOptions {
/// Device/mountpoint st_dev numbers that should be included. None for no limitation.
pub device_set: Option<HashSet<u64>>,
@@ -150,6 +150,8 @@ pub struct PxarCreateOptions {
pub skip_lost_and_found: bool,
/// Skip xattrs of files that return E2BIG error
pub skip_e2big_xattr: bool,
+ /// Reference state for partial backups
+ pub previous_ref: Option<PxarPrevRef>,
}
/// Statefull information of previous backups snapshots for partial backups
@@ -249,6 +251,7 @@ struct Archiver {
file_copy_buffer: Vec<u8>,
skip_e2big_xattr: bool,
reused_chunks: ReusedChunks,
+ previous_payload_index: Option<DynamicIndexReader>,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
}
@@ -305,6 +308,14 @@ where
MatchType::Exclude,
)?);
}
+ let (previous_payload_index, accessor) = if let Some(refs) = options.previous_ref {
+ (
+ Some(refs.payload_index),
+ refs.accessor.open_root().await.ok(),
+ )
+ } else {
+ (None, None)
+ };
let mut archiver = Archiver {
feature_flags,
@@ -322,11 +333,12 @@ where
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
skip_e2big_xattr: options.skip_e2big_xattr,
reused_chunks: ReusedChunks::new(),
+ previous_payload_index,
forced_boundaries,
};
archiver
- .archive_dir_contents(&mut encoder, source_dir, true)
+ .archive_dir_contents(&mut encoder, accessor, source_dir, true)
.await?;
encoder.finish().await?;
Ok(())
@@ -356,6 +368,7 @@ impl Archiver {
fn archive_dir_contents<'a, T: SeqWrite + Send>(
&'a mut self,
encoder: &'a mut Encoder<'_, T>,
+ mut accessor: Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
mut dir: Dir,
is_root: bool,
) -> BoxFuture<'a, Result<(), Error>> {
@@ -390,9 +403,15 @@ impl Archiver {
(self.callback)(&file_entry.path)?;
self.path = file_entry.path;
- self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
- .await
- .map_err(|err| self.wrap_err(err))?;
+ self.add_entry(
+ encoder,
+ &mut accessor,
+ dir_fd,
+ &file_entry.name,
+ &file_entry.stat,
+ )
+ .await
+ .map_err(|err| self.wrap_err(err))?;
}
self.path = old_path;
self.entry_counter = entry_counter;
@@ -640,6 +659,7 @@ impl Archiver {
async fn add_entry<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
parent: RawFd,
c_file_name: &CStr,
stat: &FileStat,
@@ -729,7 +749,7 @@ impl Archiver {
catalog.lock().unwrap().start_directory(c_file_name)?;
}
let result = self
- .add_directory(encoder, dir, c_file_name, &metadata, stat)
+ .add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
.await;
if let Some(ref catalog) = self.catalog {
catalog.lock().unwrap().end_directory()?;
@@ -782,6 +802,7 @@ impl Archiver {
async fn add_directory<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
dir: Dir,
dir_name: &CStr,
metadata: &Metadata,
@@ -812,7 +833,17 @@ impl Archiver {
log::info!("skipping mount point: {:?}", self.path);
Ok(())
} else {
- self.archive_dir_contents(encoder, dir, false).await
+ let mut dir_accessor = None;
+ if let Some(accessor) = accessor.as_mut() {
+ if let Some(file_entry) = accessor.lookup(dir_name).await? {
+ if file_entry.entry().is_dir() {
+ let dir = file_entry.enter_directory().await?;
+ dir_accessor = Some(dir);
+ }
+ }
+ }
+ self.archive_dir_contents(encoder, dir_accessor, dir, false)
+ .await
};
self.fs_magic = old_fs_magic;
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 2d48ca85..00608eee 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -44,10 +44,10 @@ use pbs_client::tools::{
CHUNK_SIZE_SCHEMA, REPO_URL_SCHEMA,
};
use pbs_client::{
- delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
- BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
- FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
- BACKUP_SOURCE_SCHEMA,
+ delete_ticket_info, parse_backup_detection_mode_specification, parse_backup_specification,
+ view_task_result, BackupReader, BackupRepository, BackupSpecificationType, BackupStats,
+ BackupWriter, ChunkStream, FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader,
+ UploadOptions, BACKUP_DETECTION_MODE_SPEC, BACKUP_SOURCE_SCHEMA,
};
use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
use pbs_datastore::chunk_store::verify_chunk_size;
@@ -702,6 +702,10 @@ fn spawn_catalog_upload(
schema: TRAFFIC_CONTROL_BURST_SCHEMA,
optional: true,
},
+ "change-detection-mode": {
+ schema: BACKUP_DETECTION_MODE_SPEC,
+ optional: true,
+ },
"exclude": {
type: Array,
description: "List of paths or patterns for matching files to exclude.",
@@ -896,6 +900,9 @@ async fn create_backup(
let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
+ let detection_mode = param["change-detection-mode"].as_str().unwrap_or("data");
+ let detection_mode = parse_backup_detection_mode_specification(detection_mode)?;
+
let client = connect_rate_limited(&repo, rate_limit)?;
record_repository(&repo);
@@ -947,6 +954,28 @@ async fn create_backup(
}
};
+ let backup_reader = if detection_mode.is_metadata() {
+ if let Ok(backup_dir) =
+ api_datastore_latest_snapshot(&client, repo.store(), &backup_ns, snapshot.group.clone())
+ .await
+ {
+ BackupReader::start(
+ &client,
+ crypt_config.clone(),
+ repo.store(),
+ &backup_ns,
+ &backup_dir,
+ true,
+ )
+ .await
+ .ok()
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
let client = BackupWriter::start(
client,
crypt_config.clone(),
@@ -1043,7 +1072,10 @@ async fn create_backup(
manifest.add_file(target, stats.size, stats.csum, crypto.mode)?;
}
(BackupSpecificationType::PXAR, false) => {
- let metadata_mode = false; // Until enabled via param
+ let archives = detection_mode.metadata_archive_names();
+ let metadata_mode = detection_mode.is_metadata()
+ && (archives.contains(&target_base) || archives.is_empty());
+
let target = if metadata_mode {
format!("{target_base}.meta.didx")
} else {
@@ -1065,12 +1097,27 @@ async fn create_backup(
.unwrap()
.start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?;
+ let previous_ref = if metadata_mode {
+ prepare_reference(
+ &target_base,
+ extension,
+ previous_manifest.clone(),
+ &client,
+ backup_reader.clone(),
+ crypt_config.clone(),
+ )
+ .await?
+ } else {
+ None
+ };
+
let pxar_options = pbs_client::pxar::PxarCreateOptions {
device_set: devices.clone(),
patterns: pattern_list.clone(),
entries_max: entries_max as usize,
skip_lost_and_found,
skip_e2big_xattr,
+ previous_ref,
};
let upload_options = UploadOptions {
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index d912734c..449a7e4c 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -355,6 +355,7 @@ fn extract(
patterns,
skip_lost_and_found: false,
skip_e2big_xattr: false,
+ previous_ref: None,
};
let pxar_writer = TokioWriter::new(writer);
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 74ee04f7..f3945801 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -336,6 +336,7 @@ async fn create_archive(
patterns,
skip_lost_and_found: false,
skip_e2big_xattr: false,
+ previous_ref: None,
};
let source = PathBuf::from(source);
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (28 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
` (5 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Adds a method to compare the metadata of the current file entry
against the metadata of the entry looked up in the previous backup
snapshot.
If the metadata matched, the start offset for the payload stream is
returned.
This is in preparation for reusing payload chunks for unchanged files.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 33 +++++++++++++++++++++++++++++++--
1 file changed, 31 insertions(+), 2 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 6713daf3..39864483 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -17,9 +17,9 @@ use nix::sys::stat::{FileStat, Mode};
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
use proxmox_sys::error::SysError;
-use pxar::accessor::aio::Accessor;
+use pxar::accessor::aio::{Accessor, Directory};
use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
-use pxar::Metadata;
+use pxar::{EntryKind, Metadata};
use proxmox_io::vec;
use proxmox_lang::c_str;
@@ -422,6 +422,35 @@ impl Archiver {
.boxed()
}
+ async fn is_reusable_entry(
+ &mut self,
+ accessor: &mut Directory<LocalDynamicReadAt<RemoteChunkReader>>,
+ file_name: &Path,
+ stat: &FileStat,
+ metadata: &Metadata,
+ ) -> Result<Option<u64>, Error> {
+ if stat.st_nlink > 1 {
+ log::debug!("re-encode: {file_name:?} has hardlinks.");
+ return Ok(None);
+ }
+
+ if let Some(file_entry) = accessor.lookup(file_name).await? {
+ if metadata == file_entry.metadata() {
+ if let EntryKind::File { payload_offset, .. } = file_entry.entry().kind() {
+ log::debug!("re-use: {file_name:?} has unchanged metadata.");
+ return Ok(payload_offset.clone());
+ }
+ log::debug!("re-encode: {file_name:?} not a regular file.");
+ return Ok(None);
+ }
+ log::debug!("re-encode: {file_name:?} metadata did not match.");
+ return Ok(None);
+ }
+
+ log::debug!("re-encode: {file_name:?} not found in previous archive.");
+ Ok(None)
+ }
+
/// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
///
/// The `existed` flag is set when iterating through a directory to note that we know the file
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (29 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
` (4 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Adds the specification for switching the detection mode used to
identify regular files which changed since a reference backup run.
Rather than being a flag selecting the mode, it is possible to pass
a list of archives for which to use metadata base file change
detection, allowing to run mixed modes with the same client
invocation.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_specification.rs | 53 ++++++++++++++++++++++++++
1 file changed, 53 insertions(+)
diff --git a/pbs-client/src/backup_specification.rs b/pbs-client/src/backup_specification.rs
index 619a3a9d..5b33b451 100644
--- a/pbs-client/src/backup_specification.rs
+++ b/pbs-client/src/backup_specification.rs
@@ -4,6 +4,7 @@ use proxmox_schema::*;
const_regex! {
BACKUPSPEC_REGEX = r"^([a-zA-Z0-9_-]+\.(pxar|img|conf|log)):(.+)$";
+ DETECTION_MODE_REGEX = r"^(data|metadata(:[a-zA-Z0-9_-]+\.pxar)*)$";
}
pub const BACKUP_SOURCE_SCHEMA: Schema =
@@ -11,6 +12,11 @@ pub const BACKUP_SOURCE_SCHEMA: Schema =
.format(&ApiStringFormat::Pattern(&BACKUPSPEC_REGEX))
.schema();
+pub const BACKUP_DETECTION_MODE_SPEC: Schema =
+ StringSchema::new("Backup source specification ([data|metadata(:<label>,...)]).")
+ .format(&ApiStringFormat::Pattern(&DETECTION_MODE_REGEX))
+ .schema();
+
pub enum BackupSpecificationType {
PXAR,
IMAGE,
@@ -45,3 +51,50 @@ pub fn parse_backup_specification(value: &str) -> Result<BackupSpecification, Er
bail!("unable to parse backup source specification '{}'", value);
}
+
+/// Mode to detect file changes since last backup run
+pub enum BackupDetectionMode {
+ /// Regular mode, re-encode payload data
+ Data,
+ /// Compare metadata, reuse payload chunks if metadata unchanged
+ ///
+ /// Stores archive names for which to use the metadata mode
+ Metadata(Vec<String>),
+}
+
+impl BackupDetectionMode {
+ /// Check if the selected mode is metadata based file change detection
+ pub fn is_metadata(&self) -> bool {
+ match self {
+ Self::Data => false,
+ Self::Metadata(_) => true,
+ }
+ }
+
+ /// List of archive names, for which metadata based file change detection should be used
+ pub fn metadata_archive_names(&self) -> &[String] {
+ match self {
+ Self::Data => &[],
+ Self::Metadata(ref archives) => archives,
+ }
+ }
+}
+
+pub fn parse_backup_detection_mode_specification(
+ value: &str,
+) -> Result<BackupDetectionMode, Error> {
+ match (DETECTION_MODE_REGEX.regex_obj)().captures(value) {
+ Some(caps) => {
+ let mode = match caps.get(1).unwrap().as_str() {
+ "data" => BackupDetectionMode::Data,
+ ty if ty.starts_with("metadata") => {
+ let archives = ty.split(':').skip(1).map(|s| s.to_string()).collect();
+ BackupDetectionMode::Metadata(archives)
+ }
+ _ => bail!("invalid backup detection mode"),
+ };
+ Ok(mode)
+ }
+ None => bail!("unable to parse backup detection mode specification '{value}'"),
+ }
+}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (30 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
` (3 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
The look-ahead cache is used to cache entries during pxar archive
creation before encoding, in order to decide if regular file payloads
might be re-used rather than re-encoded.
These types allow to store the needed data and keep track of
directory boundaries while traversing the filesystem tree.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/lookahead_cache.rs | 38 ++++++++++++++++++++++++++
pbs-client/src/pxar/mod.rs | 1 +
2 files changed, 39 insertions(+)
create mode 100644 pbs-client/src/pxar/lookahead_cache.rs
diff --git a/pbs-client/src/pxar/lookahead_cache.rs b/pbs-client/src/pxar/lookahead_cache.rs
new file mode 100644
index 00000000..68f3fd1f
--- /dev/null
+++ b/pbs-client/src/pxar/lookahead_cache.rs
@@ -0,0 +1,38 @@
+use nix::sys::stat::FileStat;
+use pxar::encoder::PayloadOffset;
+use std::ffi::CString;
+use std::os::unix::io::OwnedFd;
+
+use pxar::Metadata;
+
+pub(crate) struct CacheEntryData {
+ pub(crate) fd: OwnedFd,
+ pub(crate) c_file_name: CString,
+ pub(crate) stat: FileStat,
+ pub(crate) metadata: Metadata,
+ pub(crate) payload_offset: PayloadOffset,
+}
+
+impl CacheEntryData {
+ pub(crate) fn new(
+ fd: OwnedFd,
+ c_file_name: CString,
+ stat: FileStat,
+ metadata: Metadata,
+ payload_offset: PayloadOffset,
+ ) -> Self {
+ Self {
+ fd,
+ c_file_name,
+ stat,
+ metadata,
+ payload_offset,
+ }
+ }
+}
+
+pub(crate) enum CacheEntry {
+ RegEntry(CacheEntryData),
+ DirEntry(CacheEntryData),
+ DirEnd,
+}
diff --git a/pbs-client/src/pxar/mod.rs b/pbs-client/src/pxar/mod.rs
index 24315f5f..3729ab10 100644
--- a/pbs-client/src/pxar/mod.rs
+++ b/pbs-client/src/pxar/mod.rs
@@ -50,6 +50,7 @@
pub(crate) mod create;
pub(crate) mod dir_stack;
pub(crate) mod extract;
+pub(crate) mod lookahead_cache;
pub(crate) mod metadata;
pub(crate) mod tools;
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (31 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
` (2 subsequent siblings)
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Implements the methods to cache entries in a look-ahead cache and
flush the entries to archive, either by re-using and injecting the
payload chunks from the previous backup snapshot and storing the
reference to it, or by re-encoding the chunks.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 284 ++++++++++++++++++++++++++++++++++
1 file changed, 284 insertions(+)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 39864483..fbcccb2e 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -828,6 +828,290 @@ impl Archiver {
}
}
+ async fn cache_or_flush_entries<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ ) -> Result<(), Error> {
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+ let reusable = if let Some(accessor) = accessor {
+ self.is_reusable_entry(accessor, file_name, stat, metadata)
+ .await?
+ } else {
+ None
+ };
+
+ let file_size = stat.st_size as u64;
+ if let Some(start_offset) = reusable {
+ if let Some(ref ref_payload_index) = self.previous_payload_index {
+ let end_offset = start_offset + file_size;
+ let (indices, start_padding, _end_padding) =
+ ref_payload_index.indices(start_offset, end_offset)?;
+
+ //Insert chunks into reused_chunks, getting the correct payload offset
+ let boundary = encoder.payload_position()?;
+ let offset = self.reused_chunks.insert(indices, boundary, start_padding);
+
+ if self.cached_payload_size + file_size >= CACHED_PAYLOAD_THRESHOLD {
+ self.flush_cached_to_archive(encoder, true).await?;
+
+ encoder
+ .add_payload_ref(metadata, file_name, file_size, offset)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+ } else {
+ log::debug!("lookahead-cache: {file_name:?}");
+ self.caching_enabled = true;
+ self.cached_payload_size += file_size;
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata.clone(),
+ offset,
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+
+ return Ok(());
+ }
+ log::debug!("re-encode: {file_name:?} no previous payload index.");
+ }
+
+ self.flush_cached_to_archive(encoder, false).await?;
+ self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ }
+
+ async fn flush_cached_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ if reuse_chunks {
+ self.flush_reused_chunks(encoder)?;
+ } else {
+ self.clear_cached_chunks(encoder)?;
+ }
+ let entries = std::mem::take(&mut self.cached_entries);
+
+ self.caching_enabled = false;
+ self.cached_payload_size = 0;
+
+ for entry in entries {
+ match entry {
+ CacheEntry::RegEntry(data) => {
+ self.flush_entry_to_archive(encoder, data, reuse_chunks)
+ .await?
+ }
+ CacheEntry::DirEntry(data) => {
+ self.flush_directory_to_archive(encoder, data).await?
+ }
+ CacheEntry::DirEnd => {
+ let result = encoder.finish().await?;
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ result
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn flush_reused_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let mut reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ let last_chunk = reused_chunks.chunks.pop();
+
+ // Do not inject directly, but keep around for possible followups
+ // Needs to be flushed in any case!
+ let mut injection_boundary = reused_chunks.start_boundary();
+ for chunks in reused_chunks.chunks.chunks(128) {
+ let size = chunks.iter().fold(0u64, |sum, chunk| sum + chunk.size());
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: chunks.to_vec(),
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ injection_boundary = injection_boundary.add(size);
+ encoder.advance(size)?;
+ }
+
+ if let Some(chunk) = last_chunk {
+ let _offset = self.reused_chunks.insert(vec![chunk], injection_boundary, 0);
+ // Make sure that we flush this chunk even on clear calls
+ self.reused_chunks.must_flush_first = true;
+ }
+
+ Ok(())
+ }
+
+ fn clear_cached_chunks<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ ) -> Result<(), Error> {
+ let reused_chunks = std::mem::take(&mut self.reused_chunks);
+
+ if !reused_chunks.must_flush_first {
+ return Ok(());
+ }
+
+ // First chunk was kept back to avoid duplication but needs to be injected
+ let injection_boundary = reused_chunks.start_boundary();
+ if let Some(chunk) = reused_chunks.chunks.first() {
+ let size = chunk.size();
+ let inject_chunks = InjectChunks {
+ boundary: injection_boundary.raw(),
+ chunks: vec![chunk.clone()],
+ size: size as usize,
+ };
+ let mut boundary = self.forced_boundaries.lock().unwrap();
+ boundary.push_back(inject_chunks);
+ encoder.advance(size)?;
+ } else {
+ bail!("missing first chunk");
+ }
+
+ Ok(())
+ }
+
+ async fn flush_directory_to_archive<'a, 'b, T: SeqWrite + Send>(
+ &'a mut self,
+ encoder: &'a mut Encoder<'b, T>,
+ entry_data: CacheEntryData,
+ ) -> Result<(), Error> {
+ let CacheEntryData {
+ c_file_name,
+ metadata,
+ ..
+ } = entry_data;
+ let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
+
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().start_directory(&c_file_name)?;
+ }
+
+ encoder.create_directory(dir_name, &metadata).await?;
+
+ Ok(())
+ }
+
+ async fn flush_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ entry_data: CacheEntryData,
+ reuse_chunks: bool,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
+ let CacheEntryData {
+ fd,
+ c_file_name,
+ stat,
+ metadata,
+ payload_offset,
+ } = entry_data;
+ let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
+
+ match metadata.file_type() {
+ mode::IFREG => {
+ let link_info = HardLinkInfo {
+ st_dev: stat.st_dev,
+ st_ino: stat.st_ino,
+ };
+
+ if stat.st_nlink > 1 {
+ if let Some((path, offset)) = self.hardlinks.get(&link_info) {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_hardlink(&c_file_name)?;
+ }
+ encoder.add_hardlink(file_name, path, *offset).await?;
+ return Ok(());
+ }
+ }
+
+ let file_size = stat.st_size as u64;
+ if let Some(ref catalog) = self.catalog {
+ catalog
+ .lock()
+ .unwrap()
+ .add_file(&c_file_name, file_size, stat.st_mtime)?;
+ }
+
+ if reuse_chunks {
+ encoder
+ .add_payload_ref(&metadata, file_name, file_size, payload_offset)
+ .await?;
+ } else {
+ let offset: LinkOffset = self
+ .add_regular_file(encoder, fd, file_name, &metadata, file_size)
+ .await?;
+
+ if stat.st_nlink > 1 {
+ self.hardlinks
+ .insert(link_info, (self.path.clone(), offset));
+ }
+ }
+ }
+ mode::IFSOCK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_socket(&c_file_name)?;
+ }
+ encoder.add_socket(&metadata, file_name).await?;
+ }
+ mode::IFIFO => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_fifo(&c_file_name)?;
+ }
+ encoder.add_fifo(&metadata, file_name).await?;
+ }
+ mode::IFLNK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_symlink(&c_file_name)?;
+ }
+ self.add_symlink(encoder, fd, file_name, &metadata).await?;
+ }
+ mode::IFBLK => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_block_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ mode::IFCHR => {
+ if let Some(ref catalog) = self.catalog {
+ catalog.lock().unwrap().add_char_device(&c_file_name)?;
+ }
+ self.add_device(encoder, file_name, &metadata, &stat)
+ .await?;
+ }
+ other => bail!(
+ "encountered unknown file type: 0x{:x} (0o{:o})",
+ other,
+ other
+ ),
+ }
+
+ Ok(())
+ }
+
async fn add_directory<T: SeqWrite + Send>(
&mut self,
encoder: &mut Encoder<'_, T>,
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (32 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Add the final glue logic to enable the look-ahead caching and
metadata comparison introduced in the preparatory patches.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/pxar/create.rs | 107 ++++++++++++++++++++++++++++++++--
1 file changed, 102 insertions(+), 5 deletions(-)
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index fbcccb2e..ad76890c 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -32,10 +32,14 @@ use pbs_datastore::dynamic_index::{
};
use crate::inject_reused_chunks::InjectChunks;
+use crate::pxar::lookahead_cache::{CacheEntry, CacheEntryData};
use crate::pxar::metadata::errno_is_unsupported;
use crate::pxar::tools::assert_single_path_component;
use crate::pxar::Flags;
+const MAX_CACHE_SIZE: usize = 1024;
+const CACHED_PAYLOAD_THRESHOLD: u64 = 2 * 1024 * 1024;
+
#[derive(Default)]
struct ReusedChunks {
start_boundary: PayloadOffset,
@@ -253,6 +257,9 @@ struct Archiver {
reused_chunks: ReusedChunks,
previous_payload_index: Option<DynamicIndexReader>,
forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+ cached_entries: Vec<CacheEntry>,
+ caching_enabled: bool,
+ cached_payload_size: u64,
}
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -335,11 +342,26 @@ where
reused_chunks: ReusedChunks::new(),
previous_payload_index,
forced_boundaries,
+ cached_entries: Vec::new(),
+ caching_enabled: false,
+ cached_payload_size: 0,
};
archiver
.archive_dir_contents(&mut encoder, accessor, source_dir, true)
.await?;
+
+ if let Some(last) = archiver.cached_entries.pop() {
+ match last {
+ CacheEntry::DirEnd => {}
+ _ => archiver.cached_entries.push(last),
+ }
+ }
+
+ archiver
+ .flush_cached_to_archive(&mut encoder, false)
+ .await?;
+
encoder.finish().await?;
Ok(())
}
@@ -413,6 +435,11 @@ impl Archiver {
.await
.map_err(|err| self.wrap_err(err))?;
}
+
+ if self.caching_enabled {
+ self.cached_entries.push(CacheEntry::DirEnd);
+ }
+
self.path = old_path;
self.entry_counter = entry_counter;
self.patterns.truncate(old_patterns_count);
@@ -693,8 +720,6 @@ impl Archiver {
c_file_name: &CStr,
stat: &FileStat,
) -> Result<(), Error> {
- use pxar::format::mode;
-
let file_mode = stat.st_mode & libc::S_IFMT;
let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
OFlag::empty()
@@ -732,6 +757,71 @@ impl Archiver {
self.skip_e2big_xattr,
)?;
+ if self.previous_payload_index.is_none() {
+ return self
+ .add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await;
+ }
+
+ // Avoid having to many open file handles in cached entries
+ if self.cached_entries.len() > MAX_CACHE_SIZE {
+ self.flush_cached_to_archive(encoder, false).await?;
+ }
+
+ if metadata.is_regular_file() {
+ self.cache_or_flush_entries(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ } else {
+ if self.caching_enabled {
+ if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
+ let fd_clone = fd.try_clone()?;
+ let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata.clone(),
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+
+ let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
+ self.add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
+ .await?;
+
+ if let Some(ref catalog) = self.catalog {
+ if !self.caching_enabled {
+ catalog.lock().unwrap().end_directory()?;
+ }
+ }
+ } else {
+ let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
+ fd,
+ c_file_name.into(),
+ stat.clone(),
+ metadata,
+ PayloadOffset::default(),
+ ));
+ self.cached_entries.push(cache_entry);
+ }
+ Ok(())
+ } else {
+ self.add_entry_to_archive(encoder, accessor, c_file_name, stat, fd, &metadata)
+ .await
+ }
+ }
+ }
+
+ async fn add_entry_to_archive<T: SeqWrite + Send>(
+ &mut self,
+ encoder: &mut Encoder<'_, T>,
+ accessor: &mut Option<Directory<LocalDynamicReadAt<RemoteChunkReader>>>,
+ c_file_name: &CStr,
+ stat: &FileStat,
+ fd: OwnedFd,
+ metadata: &Metadata,
+ ) -> Result<(), Error> {
+ use pxar::format::mode;
+
let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
match metadata.file_type() {
mode::IFREG => {
@@ -781,7 +871,9 @@ impl Archiver {
.add_directory(encoder, accessor, dir, c_file_name, &metadata, stat)
.await;
if let Some(ref catalog) = self.catalog {
- catalog.lock().unwrap().end_directory()?;
+ if !self.caching_enabled {
+ catalog.lock().unwrap().end_directory()?;
+ }
}
result
}
@@ -1123,7 +1215,9 @@ impl Archiver {
) -> Result<(), Error> {
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
- encoder.create_directory(dir_name, metadata).await?;
+ if !self.caching_enabled {
+ encoder.create_directory(dir_name, metadata).await?;
+ }
let old_fs_magic = self.fs_magic;
let old_fs_feature_flags = self.fs_feature_flags;
@@ -1163,7 +1257,10 @@ impl Archiver {
self.fs_feature_flags = old_fs_feature_flags;
self.current_st_dev = old_st_dev;
- encoder.finish().await?;
+ if !self.caching_enabled {
+ encoder.finish().await?;
+ }
+
result
}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (33 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Introduces the proxmox-backup-test-suite create intended for
benchmarking and high level user facing testing.
The initial code includes a benchmark intended for regression testing of
the proxmox-backup-client when using different file detection modes
during backup.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Cargo.toml | 1 +
proxmox-backup-test-suite/Cargo.toml | 18 ++
.../src/detection_mode_bench.rs | 294 ++++++++++++++++++
proxmox-backup-test-suite/src/main.rs | 17 +
4 files changed, 330 insertions(+)
create mode 100644 proxmox-backup-test-suite/Cargo.toml
create mode 100644 proxmox-backup-test-suite/src/detection_mode_bench.rs
create mode 100644 proxmox-backup-test-suite/src/main.rs
diff --git a/Cargo.toml b/Cargo.toml
index 00dc4d86..76635b4e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -45,6 +45,7 @@ members = [
"proxmox-restore-daemon",
"pxar-bin",
+ "proxmox-backup-test-suite",
]
[lib]
diff --git a/proxmox-backup-test-suite/Cargo.toml b/proxmox-backup-test-suite/Cargo.toml
new file mode 100644
index 00000000..3f899e9b
--- /dev/null
+++ b/proxmox-backup-test-suite/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "proxmox-backup-test-suite"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+
+[dependencies]
+anyhow.workspace = true
+futures.workspace = true
+serde.workspace = true
+serde_json.workspace = true
+
+pbs-client.workspace = true
+pbs-key-config.workspace = true
+pbs-tools.workspace = true
+proxmox-async.workspace = true
+proxmox-router = { workspace = true, features = ["cli"] }
+proxmox-schema = { workspace = true, features = [ "api-macro" ] }
diff --git a/proxmox-backup-test-suite/src/detection_mode_bench.rs b/proxmox-backup-test-suite/src/detection_mode_bench.rs
new file mode 100644
index 00000000..9a3c7680
--- /dev/null
+++ b/proxmox-backup-test-suite/src/detection_mode_bench.rs
@@ -0,0 +1,294 @@
+use std::path::Path;
+use std::process::Command;
+use std::{thread, time};
+
+use anyhow::{bail, format_err, Error};
+use serde_json::Value;
+
+use pbs_client::{
+ tools::{complete_repository, key_source::KEYFILE_SCHEMA, REPO_URL_SCHEMA},
+ BACKUP_SOURCE_SCHEMA,
+};
+use pbs_tools::json;
+use proxmox_router::cli::*;
+use proxmox_schema::api;
+
+const DEFAULT_NUMBER_OF_RUNS: u64 = 5;
+// Homepage https://cocodataset.org/
+const COCO_DATASET_SRC_URL: &'static str = "http://images.cocodataset.org/zips/unlabeled2017.zip";
+// Homepage https://kernel.org/
+const LINUX_GIT_REPOSITORY: &'static str =
+ "git://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git";
+const LINUX_GIT_TAG: &'static str = "v6.5.5";
+
+pub(crate) fn detection_mode_bench_mgtm_cli() -> CliCommandMap {
+ let run_cmd_def = CliCommand::new(&API_METHOD_DETECTION_MODE_BENCH_RUN)
+ .arg_param(&["backupspec"])
+ .completion_cb("repository", complete_repository)
+ .completion_cb("keyfile", complete_file_name);
+
+ let prepare_cmd_def = CliCommand::new(&API_METHOD_DETECTION_MODE_BENCH_PREPARE);
+ CliCommandMap::new()
+ .insert("prepare", prepare_cmd_def)
+ .insert("run", run_cmd_def)
+}
+
+#[api(
+ input: {
+ properties: {
+ backupspec: {
+ type: Array,
+ description: "List of backup source specifications ([<label.ext>:<path>] ...)",
+ items: {
+ schema: BACKUP_SOURCE_SCHEMA,
+ }
+ },
+ repository: {
+ schema: REPO_URL_SCHEMA,
+ optional: true,
+ },
+ keyfile: {
+ schema: KEYFILE_SCHEMA,
+ optional: true,
+ },
+ "number-of-runs": {
+ description: "Number of times to repeat the run",
+ type: Integer,
+ optional: true,
+ },
+ }
+ }
+)]
+/// Run benchmark to compare performance for backups using different change detection modes.
+fn detection_mode_bench_run(param: Value) -> Result<(), Error> {
+ let mut pbc = Command::new("proxmox-backup-client");
+ pbc.arg("backup");
+
+ let backupspec_list = json::required_array_param(¶m, "backupspec")?;
+ for backupspec in backupspec_list {
+ let arg = backupspec
+ .as_str()
+ .ok_or_else(|| format_err!("failed to parse backupspec"))?;
+ pbc.arg(arg);
+ }
+
+ if let Some(repo) = param["repository"].as_str() {
+ pbc.arg("--repository");
+ pbc.arg::<&str>(repo);
+ }
+
+ if let Some(keyfile) = param["keyfile"].as_str() {
+ pbc.arg("--keyfile");
+ pbc.arg::<&str>(keyfile);
+ }
+
+ let number_of_runs = match param["number_of_runs"].as_u64() {
+ Some(n) => n,
+ None => DEFAULT_NUMBER_OF_RUNS,
+ };
+ if number_of_runs < 1 {
+ bail!("Number of runs must be greater than 1, aborting.");
+ }
+
+ // First run is an initial run to make sure all chunks are present already, reduce side effects
+ // by filesystem caches ecc.
+ let _stats_initial = do_run(&mut pbc, 1)?;
+
+ println!("\nStarting benchmarking backups with regular detection mode...\n");
+ let stats_reg = do_run(&mut pbc, number_of_runs)?;
+
+ // Make sure to have a valid reference with catalog fromat version 2
+ pbc.arg("--change-detection-mode=metadata");
+ let _stats_initial = do_run(&mut pbc, 1)?;
+
+ println!("\nStarting benchmarking backups with metadata detection mode...\n");
+ let stats_meta = do_run(&mut pbc, number_of_runs)?;
+
+ println!("\nCompleted benchmark with {number_of_runs} runs for each tested mode.");
+ println!("\nCompleted regular backup with:");
+ println!("Total runtime: {:.2} s", stats_reg.total);
+ println!("Average: {:.2} ± {:.2} s", stats_reg.avg, stats_reg.stddev);
+ println!("Min: {:.2} s", stats_reg.min);
+ println!("Max: {:.2} s", stats_reg.max);
+
+ println!("\nCompleted metadata detection mode backup with:");
+ println!("Total runtime: {:.2} s", stats_meta.total);
+ println!(
+ "Average: {:.2} ± {:.2} s",
+ stats_meta.avg, stats_meta.stddev
+ );
+ println!("Min: {:.2} s", stats_meta.min);
+ println!("Max: {:.2} s", stats_meta.max);
+
+ let diff_stddev =
+ ((stats_meta.stddev * stats_meta.stddev) + (stats_reg.stddev * stats_reg.stddev)).sqrt();
+ println!("\nDifferences (metadata based - regular):");
+ println!(
+ "Delta total runtime: {:.2} s ({:.2} %)",
+ stats_meta.total - stats_reg.total,
+ 100.0 * (stats_meta.total / stats_reg.total - 1.0),
+ );
+ println!(
+ "Delta average: {:.2} ± {:.2} s ({:.2} %)",
+ stats_meta.avg - stats_reg.avg,
+ diff_stddev,
+ 100.0 * (stats_meta.avg / stats_reg.avg - 1.0),
+ );
+ println!(
+ "Delta min: {:.2} s ({:.2} %)",
+ stats_meta.min - stats_reg.min,
+ 100.0 * (stats_meta.min / stats_reg.min - 1.0),
+ );
+ println!(
+ "Delta max: {:.2} s ({:.2} %)",
+ stats_meta.max - stats_reg.max,
+ 100.0 * (stats_meta.max / stats_reg.max - 1.0),
+ );
+
+ Ok(())
+}
+
+fn do_run(cmd: &mut Command, n_runs: u64) -> Result<Statistics, Error> {
+ // Avoid consecutive snapshot timestamps collision
+ thread::sleep(time::Duration::from_millis(1000));
+ let mut timings = Vec::with_capacity(n_runs as usize);
+ for iteration in 1..n_runs + 1 {
+ let start = std::time::SystemTime::now();
+ let mut child = cmd.spawn()?;
+ let exit_code = child.wait()?;
+ let elapsed = start.elapsed()?;
+ timings.push(elapsed);
+ if !exit_code.success() {
+ bail!("Run number {iteration} of {n_runs} failed, aborting.");
+ }
+ }
+
+ Ok(statistics(timings))
+}
+
+struct Statistics {
+ total: f64,
+ avg: f64,
+ stddev: f64,
+ min: f64,
+ max: f64,
+}
+
+fn statistics(timings: Vec<std::time::Duration>) -> Statistics {
+ let total = timings
+ .iter()
+ .fold(0f64, |sum, time| sum + time.as_secs_f64());
+ let avg = total / timings.len() as f64;
+ let var = 1f64 / (timings.len() - 1) as f64
+ * timings.iter().fold(0f64, |sq_sum, time| {
+ let diff = time.as_secs_f64() - avg;
+ sq_sum + diff * diff
+ });
+ let stddev = var.sqrt();
+ let min = timings.iter().min().unwrap().as_secs_f64();
+ let max = timings.iter().max().unwrap().as_secs_f64();
+
+ Statistics {
+ total,
+ avg,
+ stddev,
+ min,
+ max,
+ }
+}
+
+#[api(
+ input: {
+ properties: {
+ target: {
+ description: "target path to prepare test data.",
+ },
+ },
+ },
+)]
+/// Prepare files required for detection mode backup benchmarks.
+fn detection_mode_bench_prepare(target: String) -> Result<(), Error> {
+ let linux_repo_target = format!("{target}/linux");
+ let coco_dataset_target = format!("{target}/coco");
+ git_clone(LINUX_GIT_REPOSITORY, linux_repo_target.as_str())?;
+ git_checkout(LINUX_GIT_TAG, linux_repo_target.as_str())?;
+ wget_download(COCO_DATASET_SRC_URL, coco_dataset_target.as_str())?;
+
+ Ok(())
+}
+
+fn git_clone(repo: &str, target: &str) -> Result<(), Error> {
+ println!("Calling git clone for '{repo}'.");
+ let target_git = format!("{target}/.git");
+ let path = Path::new(&target_git);
+ if let Ok(true) = path.try_exists() {
+ println!("Target '{target}' already contains a git repository, skip.");
+ return Ok(());
+ }
+
+ let mut git = Command::new("git");
+ git.args(["clone", repo, target]);
+
+ let mut child = git.spawn()?;
+ let exit_code = child.wait()?;
+ if exit_code.success() {
+ println!("git clone finished with success.");
+ } else {
+ bail!("git clone failed for '{target}'.");
+ }
+
+ Ok(())
+}
+
+fn git_checkout(checkout_target: &str, target: &str) -> Result<(), Error> {
+ println!("Calling git checkout '{checkout_target}'.");
+ let mut git = Command::new("git");
+ git.args(["-C", target, "checkout", checkout_target]);
+
+ let mut child = git.spawn()?;
+ let exit_code = child.wait()?;
+ if exit_code.success() {
+ println!("git checkout finished with success.");
+ } else {
+ bail!("git checkout '{checkout_target}' failed for '{target}'.");
+ }
+ Ok(())
+}
+
+fn wget_download(source_url: &str, target: &str) -> Result<(), Error> {
+ let path = Path::new(&target);
+ if let Ok(true) = path.try_exists() {
+ println!("Target '{target}' already exists, skip.");
+ return Ok(());
+ }
+ let zip = format!("{}/unlabeled2017.zip", target);
+ let path = Path::new(&zip);
+ if !path.try_exists()? {
+ println!("Download archive using wget from '{source_url}' to '{target}'.");
+ let mut wget = Command::new("wget");
+ wget.args(["-P", target, source_url]);
+
+ let mut child = wget.spawn()?;
+ let exit_code = child.wait()?;
+ if exit_code.success() {
+ println!("Download finished with success.");
+ } else {
+ bail!("Failed to download '{source_url}' to '{target}'.");
+ }
+ return Ok(());
+ } else {
+ println!("Target '{target}' already contains download, skip download.");
+ }
+
+ let mut unzip = Command::new("unzip");
+ unzip.args([&zip, "-d", target]);
+
+ let mut child = unzip.spawn()?;
+ let exit_code = child.wait()?;
+ if exit_code.success() {
+ println!("Extracting zip archive finished with success.");
+ } else {
+ bail!("Failed to extract zip archive '{zip}' to '{target}'.");
+ }
+ Ok(())
+}
diff --git a/proxmox-backup-test-suite/src/main.rs b/proxmox-backup-test-suite/src/main.rs
new file mode 100644
index 00000000..0a5b436a
--- /dev/null
+++ b/proxmox-backup-test-suite/src/main.rs
@@ -0,0 +1,17 @@
+use proxmox_router::cli::*;
+
+mod detection_mode_bench;
+
+fn main() {
+ let cmd_def = CliCommandMap::new().insert(
+ "detection-mode-bench",
+ detection_mode_bench::detection_mode_bench_mgtm_cli(),
+ );
+
+ let rpcenv = CliEnvironment::new();
+ run_cli_command(
+ cmd_def,
+ rpcenv,
+ Some(|future| proxmox_async::runtime::main(future)),
+ );
+}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread
* [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
` (34 preceding siblings ...)
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
@ 2024-02-28 14:02 ` Christian Ebner
35 siblings, 0 replies; 39+ messages in thread
From: Christian Ebner @ 2024-02-28 14:02 UTC (permalink / raw)
To: pbs-devel
Adds the required files for bash and zsh completion and packages the
binary to be included in the proxmox-backup-client debian package.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Makefile | 13 ++++++++-----
debian/proxmox-backup-client.bash-completion | 1 +
debian/proxmox-backup-client.install | 2 ++
debian/proxmox-backup-test-suite.bc | 8 ++++++++
zsh-completions/_proxmox-backup-test-suite | 13 +++++++++++++
5 files changed, 32 insertions(+), 5 deletions(-)
create mode 100644 debian/proxmox-backup-test-suite.bc
create mode 100644 zsh-completions/_proxmox-backup-test-suite
diff --git a/Makefile b/Makefile
index 0317dd5e..acaac3f7 100644
--- a/Makefile
+++ b/Makefile
@@ -8,11 +8,12 @@ SUBDIRS := etc www docs
# Binaries usable by users
USR_BIN := \
- proxmox-backup-client \
- proxmox-file-restore \
- pxar \
- proxmox-tape \
- pmtx \
+ proxmox-backup-client \
+ proxmox-backup-test-suite \
+ proxmox-file-restore \
+ pxar \
+ proxmox-tape \
+ pmtx \
pmt
# Binaries usable by admins
@@ -165,6 +166,8 @@ $(COMPILED_BINS) $(COMPILEDIR)/dump-catalog-shell-cli $(COMPILEDIR)/docgen: .do-
--bin proxmox-backup-client \
--bin dump-catalog-shell-cli \
--bin proxmox-backup-debug \
+ --package proxmox-backup-test-suite \
+ --bin proxmox-backup-test-suite \
--package proxmox-file-restore \
--bin proxmox-file-restore \
--package pxar-bin \
diff --git a/debian/proxmox-backup-client.bash-completion b/debian/proxmox-backup-client.bash-completion
index 43736017..c4ff02ae 100644
--- a/debian/proxmox-backup-client.bash-completion
+++ b/debian/proxmox-backup-client.bash-completion
@@ -1,2 +1,3 @@
debian/proxmox-backup-client.bc proxmox-backup-client
+debian/proxmox-backup-test-suite.bc proxmox-backup-test-suite
debian/pxar.bc pxar
diff --git a/debian/proxmox-backup-client.install b/debian/proxmox-backup-client.install
index 74b568f1..0eb85975 100644
--- a/debian/proxmox-backup-client.install
+++ b/debian/proxmox-backup-client.install
@@ -1,6 +1,8 @@
usr/bin/proxmox-backup-client
+usr/bin/proxmox-backup-test-suite
usr/bin/pxar
usr/share/man/man1/proxmox-backup-client.1
usr/share/man/man1/pxar.1
usr/share/zsh/vendor-completions/_proxmox-backup-client
+usr/share/zsh/vendor-completions/_proxmox-backup-test-suite
usr/share/zsh/vendor-completions/_pxar
diff --git a/debian/proxmox-backup-test-suite.bc b/debian/proxmox-backup-test-suite.bc
new file mode 100644
index 00000000..2686d7ea
--- /dev/null
+++ b/debian/proxmox-backup-test-suite.bc
@@ -0,0 +1,8 @@
+# proxmox-backup-test-suite bash completion
+
+# see http://tiswww.case.edu/php/chet/bash/FAQ
+# and __ltrim_colon_completions() in /usr/share/bash-completion/bash_completion
+# this modifies global var, but I found no better way
+COMP_WORDBREAKS=${COMP_WORDBREAKS//:}
+
+complete -C 'proxmox-backup-test-suite bashcomplete' proxmox-backup-test-suite
diff --git a/zsh-completions/_proxmox-backup-test-suite b/zsh-completions/_proxmox-backup-test-suite
new file mode 100644
index 00000000..72ebcea5
--- /dev/null
+++ b/zsh-completions/_proxmox-backup-test-suite
@@ -0,0 +1,13 @@
+#compdef _proxmox-backup-test-suite() proxmox-backup-test-suite
+
+function _proxmox-backup-test-suite() {
+ local cwords line point cmd curr prev
+ cwords=${#words[@]}
+ line=$words
+ point=${#line}
+ cmd=${words[1]}
+ curr=${words[cwords]}
+ prev=${words[cwords-1]}
+ compadd -- $(COMP_CWORD="$cwords" COMP_LINE="$line" COMP_POINT="$point" \
+ proxmox-backup-test-suite bashcomplete "$cmd" "$curr" "$prev")
+}
--
2.39.2
^ permalink raw reply [flat|nested] 39+ messages in thread