public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v9 proxmox-backup 48/58] client: chunk stream: switch payload stream chunker
Date: Wed,  5 Jun 2024 12:54:06 +0200	[thread overview]
Message-ID: <20240605105416.278748-49-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240605105416.278748-1-c.ebner@proxmox.com>

Use the dedicated chunker with boundary suggestions for the payload
stream, by attaching the channel sender to the archiver and the
channel receiver to the payload stream chunker.

The archiver sends the file boundaries for the chunker to consume.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 8:
- no changes

 examples/test_chunk_speed2.rs                 |  2 +-
 pbs-client/src/chunk_stream.rs                | 15 +++++--
 pbs-client/src/pxar/create.rs                 |  8 ++++
 pbs-client/src/pxar_backup_stream.rs          | 40 +++++++++++--------
 proxmox-backup-client/src/main.rs             | 16 +++++---
 .../src/proxmox_restore_daemon/api.rs         | 12 +++++-
 pxar-bin/src/main.rs                          |  1 +
 tests/catar.rs                                |  1 +
 8 files changed, 68 insertions(+), 27 deletions(-)

diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
index 22dd14ce2..f2963746a 100644
--- a/examples/test_chunk_speed2.rs
+++ b/examples/test_chunk_speed2.rs
@@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
         .map_err(Error::from);
 
     //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
-    let mut chunk_stream = ChunkStream::new(stream, None, None);
+    let mut chunk_stream = ChunkStream::new(stream, None, None, None);
 
     let start_time = std::time::Instant::now();
 
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index 070a10c17..e3f0980c6 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -7,7 +7,7 @@ use bytes::BytesMut;
 use futures::ready;
 use futures::stream::{Stream, TryStream};
 
-use pbs_datastore::{Chunker, ChunkerImpl};
+use pbs_datastore::{Chunker, ChunkerImpl, PayloadChunker};
 
 use crate::inject_reused_chunks::InjectChunks;
 
@@ -42,11 +42,20 @@ pub struct ChunkStream<S: Unpin> {
 }
 
 impl<S: Unpin> ChunkStream<S> {
-    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
+    pub fn new(
+        input: S,
+        chunk_size: Option<usize>,
+        injection_data: Option<InjectionData>,
+        suggested_boundaries: Option<mpsc::Receiver<u64>>,
+    ) -> Self {
         let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024);
         Self {
             input,
-            chunker: Box::new(ChunkerImpl::new(chunk_size)),
+            chunker: if let Some(suggested) = suggested_boundaries {
+                Box::new(PayloadChunker::new(chunk_size, suggested))
+            } else {
+                Box::new(ChunkerImpl::new(chunk_size))
+            },
             buffer: BytesMut::new(),
             scan_pos: 0,
             consumed: 0,
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index eadd670df..03a6a1448 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -169,6 +169,7 @@ struct Archiver {
     file_copy_buffer: Vec<u8>,
     skip_e2big_xattr: bool,
     forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
+    suggested_boundaries: Option<mpsc::Sender<u64>>,
     previous_payload_index: Option<DynamicIndexReader>,
     cache: PxarLookaheadCache,
     reuse_stats: ReuseStats,
@@ -197,6 +198,7 @@ pub async fn create_archive<T, F>(
     callback: F,
     options: PxarCreateOptions,
     forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
+    suggested_boundaries: Option<mpsc::Sender<u64>>,
 ) -> Result<(), Error>
 where
     T: SeqWrite + Send,
@@ -271,6 +273,7 @@ where
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         skip_e2big_xattr: options.skip_e2big_xattr,
         forced_boundaries,
+        suggested_boundaries,
         previous_payload_index,
         cache: PxarLookaheadCache::new(None),
         reuse_stats: ReuseStats::default(),
@@ -862,6 +865,11 @@ impl Archiver {
                         .add_file(c_file_name, file_size, stat.st_mtime)?;
                 }
 
+                if let Some(sender) = self.suggested_boundaries.as_mut() {
+                    let offset = encoder.payload_position()?.raw();
+                    sender.send(offset)?;
+                }
+
                 let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
                     self.reuse_stats.total_reused_payload_size +=
                         file_size + size_of::<pxar::format::Header>() as u64;
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index fb6d063f2..f322566f0 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -27,6 +27,7 @@ use crate::pxar::create::PxarWriters;
 /// consumer.
 pub struct PxarBackupStream {
     rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
+    pub suggested_boundaries: Option<std::sync::mpsc::Receiver<u64>>,
     handle: Option<AbortHandle>,
     error: Arc<Mutex<Option<String>>>,
 }
@@ -55,22 +56,26 @@ impl PxarBackupStream {
         ));
         let writer = pxar::encoder::sync::StandardWriter::new(writer);
 
-        let (writer, payload_rx) = if separate_payload_stream {
-            let (tx, rx) = std::sync::mpsc::sync_channel(10);
-            let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
-                buffer_size,
-                StdChannelWriter::new(tx),
-            ));
-            (
-                pxar::PxarVariant::Split(
-                    writer,
-                    pxar::encoder::sync::StandardWriter::new(payload_writer),
-                ),
-                Some(rx),
-            )
-        } else {
-            (pxar::PxarVariant::Unified(writer), None)
-        };
+        let (writer, payload_rx, suggested_boundaries_tx, suggested_boundaries_rx) =
+            if separate_payload_stream {
+                let (tx, rx) = std::sync::mpsc::sync_channel(10);
+                let (suggested_boundaries_tx, suggested_boundaries_rx) = std::sync::mpsc::channel();
+                let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity(
+                    buffer_size,
+                    StdChannelWriter::new(tx),
+                ));
+                (
+                    pxar::PxarVariant::Split(
+                        writer,
+                        pxar::encoder::sync::StandardWriter::new(payload_writer),
+                    ),
+                    Some(rx),
+                    Some(suggested_boundaries_tx),
+                    Some(suggested_boundaries_rx),
+                )
+            } else {
+                (pxar::PxarVariant::Unified(writer), None, None, None)
+            };
 
         let error = Arc::new(Mutex::new(None));
         let error2 = Arc::clone(&error);
@@ -85,6 +90,7 @@ impl PxarBackupStream {
                 },
                 options,
                 boundaries,
+                suggested_boundaries_tx,
             )
             .await
             {
@@ -99,12 +105,14 @@ impl PxarBackupStream {
 
         let backup_stream = Self {
             rx: Some(rx),
+            suggested_boundaries: None,
             handle: Some(handle.clone()),
             error: Arc::clone(&error),
         };
 
         let backup_payload_stream = payload_rx.map(|rx| Self {
             rx: Some(rx),
+            suggested_boundaries: suggested_boundaries_rx,
             handle: Some(handle),
             error,
         });
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index b4d01ed3f..a17588edf 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -209,7 +209,7 @@ async fn backup_directory<P: AsRef<Path>>(
         payload_target.is_some(),
     )?;
 
-    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
+    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None, None);
     let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
     let stream = ReceiverStream::new(rx).map_err(Error::from);
@@ -223,14 +223,19 @@ async fn backup_directory<P: AsRef<Path>>(
 
     let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
 
-    if let Some(payload_stream) = payload_stream {
+    if let Some(mut payload_stream) = payload_stream {
         let payload_target = payload_target
             .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
 
         let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
         let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
-        let mut payload_chunk_stream =
-            ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
+        let suggested_boundaries = payload_stream.suggested_boundaries.take();
+        let mut payload_chunk_stream = ChunkStream::new(
+            payload_stream,
+            chunk_size,
+            Some(injection_data),
+            suggested_boundaries,
+        );
         let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
         let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
 
@@ -573,7 +578,8 @@ fn spawn_catalog_upload(
     let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
     let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
     let catalog_chunk_size = 512 * 1024;
-    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
+    let catalog_chunk_stream =
+        ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None, None);
 
     let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
         StdChannelWriter::new(catalog_tx),
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index 681fa6db9..80af5011e 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -364,8 +364,16 @@ fn extract(
                     };
 
                     let pxar_writer = pxar::PxarVariant::Unified(TokioWriter::new(writer));
-                    create_archive(dir, PxarWriters::new(pxar_writer, None), Flags::DEFAULT, |_| Ok(()), options, None)
-                        .await
+                    create_archive(
+                        dir,
+                        PxarWriters::new(pxar_writer, None),
+                        Flags::DEFAULT,
+                        |_| Ok(()),
+                        options,
+                        None,
+                        None,
+                    )
+                    .await
                 }
                 .await;
                 if let Err(err) = result {
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 85887a8ed..fa584b4e8 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -442,6 +442,7 @@ async fn create_archive(
         },
         options,
         None,
+        None,
     )
     .await?;
 
diff --git a/tests/catar.rs b/tests/catar.rs
index 9f83b4cc2..94c565012 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -40,6 +40,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         |_| Ok(()),
         options,
         None,
+        None,
     ))?;
 
     Command::new("cmp")
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-06-05 10:55 UTC|newest]

Thread overview: 59+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-06-05 10:53 [pbs-devel] [PATCH v9 proxmox-backup 00/58] fix #3174: improve file-level backup Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 01/58] client: pxar: switch to stack based encoder state Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 02/58] client: pxar: combine writers into struct Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 03/58] client: pxar: optionally split metadata and payload streams Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 04/58] client: helper: add helpers for creating reader instances Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 05/58] client: helper: add method for split archive name mapping Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 06/58] client: tools: helper to check pxar filename extensions Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 07/58] client: restore: read payload from dedicated index Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 08/58] client: tools: cover extension for split pxar archives Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 09/58] client: mount: make split pxar archives mountable Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 10/58] api: datastore: attach split archive payload chunk reader Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 11/58] catalog: shell: make split pxar archives accessible Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 12/58] www: cover metadata extension for pxar archives Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 13/58] file restore: cover extension for split " Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 14/58] file restore: factor out getting pxar reader Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 15/58] file restore: cover split metadata and payload archives Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 16/58] file restore: show more error context when extraction fails Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 17/58] pxar: bin: add optional payload input for archive restore Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 18/58] pxar: bin: cover listing for split archives Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 19/58] pxar: bin: add more context to extraction error Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 20/58] client: pxar: include payload offset in entry listing Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 21/58] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 22/58] upload stream: implement reused chunk injector Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 23/58] client: chunk stream: add struct to hold injection state Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 24/58] chunker: add method to reset chunker state Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 25/58] client: streams: add channels for dynamic entry injection Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 26/58] specs: add backup detection mode specification Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 27/58] client: implement prepare reference method Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 28/58] client: pxar: add method for metadata comparison Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 29/58] pxar: caching: add look-ahead cache Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 30/58] client: pxar: refactor catalog encoding for directories Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 31/58] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 32/58] client: backup writer: add injected chunk count to stats Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 33/58] pxar: create: keep track of reused chunks and files Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 34/58] pxar: create: show chunk injection stats info output Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 35/58] client: backup writer: make backup info output more concise Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 36/58] client: pxar: add helper to handle optional preludes Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 37/58] client: pxar: opt encode cli exclude patterns as Prelude Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 38/58] client: pxar: allow to restore prelude to optional path Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 39/58] pxar: bin: show padding in debug output on archive list Christian Ebner
2024-06-05 10:53 ` [pbs-devel] [PATCH v9 proxmox-backup 40/58] pxar: bin: ignore version and prelude entries in listing Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 42/58] pxar: bin: support creation of split pxar archives via cli Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 43/58] pxar: add optional payload input to mount archive Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 44/58] datastore: chunker: add Chunker trait Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 45/58] datastore: chunker: implement chunker for payload stream Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 46/58] chunker: tests: add regression tests for payload chunker Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 47/58] chunk stream: " Christian Ebner
2024-06-05 10:54 ` Christian Ebner [this message]
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 49/58] client: pxar: add archive creation with reference test Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 50/58] client: tools: add helper to raise nofile rlimit Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 51/58] client: pxar: set cache limit based on " Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 52/58] api: datastore: add endpoint to lookup entries via pxar archive Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 53/58] api: datastore: add optional archive-name to file-restore Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 54/58] www: content: lookup via metadata archive instead of catalog Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 55/58] docs: file formats: describe split pxar archive file layout Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 56/58] docs: add section describing change detection mode Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 57/58] test-suite: add detection mode change benchmark Christian Ebner
2024-06-05 10:54 ` [pbs-devel] [PATCH v9 proxmox-backup 58/58] test-suite: Makefile: add debian package and related files Christian Ebner
2024-06-06  6:47 ` [pbs-devel] partially-applied: [PATCH v9 proxmox-backup 00/58] fix #3174: improve file-level backup Fabian Grünbichler

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240605105416.278748-49-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal