From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 3AA5D1FF38E for ; Tue, 14 May 2024 12:40:06 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id ABC104FE2; Tue, 14 May 2024 12:40:13 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Tue, 14 May 2024 12:34:15 +0200 Message-Id: <20240514103421.289431-60-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240514103421.289431-1-c.ebner@proxmox.com> References: <20240514103421.289431-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [create.rs, catar.rs, api.rs, main.rs] Subject: [pbs-devel] [PATCH v6 proxmox-backup 59/65] client: chunk stream: switch payload stream chunker X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Use the dedicated chunker with boundary suggestions for the payload stream, by attaching the channel sender to the archiver and the channel receiver to the payload stream chunker. The archiver sends the file boundaries for the chunker to consume. Signed-off-by: Christian Ebner --- examples/test_chunk_speed2.rs | 2 +- pbs-client/src/chunk_stream.rs | 15 ++++++-- pbs-client/src/pxar/create.rs | 8 +++++ pbs-client/src/pxar_backup_stream.rs | 34 ++++++++++++------- proxmox-backup-client/src/main.rs | 16 ++++++--- .../src/proxmox_restore_daemon/api.rs | 1 + pxar-bin/src/main.rs | 1 + tests/catar.rs | 1 + 8 files changed, 56 insertions(+), 22 deletions(-) diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 22dd14ce2..f2963746a 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None, None); + let mut chunk_stream = ChunkStream::new(stream, None, None, None); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index a028aab05..4d5431a2b 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; -use pbs_datastore::{Chunker, ChunkerImpl}; +use pbs_datastore::{Chunker, ChunkerImpl, PayloadChunker}; use crate::inject_reused_chunks::InjectChunks; @@ -42,11 +42,20 @@ pub struct ChunkStream { } impl ChunkStream { - pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { + pub fn new( + input: S, + chunk_size: Option, + injection_data: Option, + suggested_boundaries: Option>, + ) -> Self { let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024); Self { input, - chunker: Box::new(ChunkerImpl::new(chunk_size)), + chunker: if let Some(suggested) = suggested_boundaries { + Box::new(PayloadChunker::new(chunk_size, suggested)) + } else { + Box::new(ChunkerImpl::new(chunk_size)) + }, buffer: BytesMut::new(), scan_pos: 0, consumed: 0, diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 19f2349fa..287e47655 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -169,6 +169,7 @@ struct Archiver { file_copy_buffer: Vec, skip_e2big_xattr: bool, forced_boundaries: Option>, + suggested_boundaries: Option>, previous_payload_index: Option, cached_entries: Vec, cached_hardlinks: HashSet, @@ -207,6 +208,7 @@ pub async fn create_archive( callback: F, options: PxarCreateOptions, forced_boundaries: Option>, + suggested_boundaries: Option>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -288,6 +290,7 @@ where skip_e2big_xattr: options.skip_e2big_xattr, forced_boundaries, previous_payload_index, + suggested_boundaries, cached_entries: Vec::new(), cached_range: Range::default(), cached_last_chunk: None, @@ -843,6 +846,11 @@ impl Archiver { .add_file(c_file_name, file_size, stat.st_mtime)?; } + if let Some(sender) = self.suggested_boundaries.as_mut() { + let offset = encoder.payload_position()?.raw(); + sender.send(offset)?; + } + let offset: LinkOffset = if let Some(payload_offset) = payload_offset { self.reuse_stats.total_reused_payload_size += file_size + size_of::() as u64; diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 9d2cb41d6..59ccbd631 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -27,6 +27,7 @@ use crate::pxar::create::PxarWriters; /// consumer. pub struct PxarBackupStream { rx: Option, Error>>>, + pub suggested_boundaries: Option>, handle: Option, error: Arc>>, } @@ -55,19 +56,23 @@ impl PxarBackupStream { )); let writer = pxar::encoder::sync::StandardWriter::new(writer); - let (payload_writer, payload_rx) = if separate_payload_stream { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( - buffer_size, - StdChannelWriter::new(tx), - )); - ( - Some(pxar::encoder::sync::StandardWriter::new(payload_writer)), - Some(rx), - ) - } else { - (None, None) - }; + let (payload_writer, payload_rx, suggested_boundaries_tx, suggested_boundaries_rx) = + if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let (suggested_boundaries_tx, suggested_boundaries_rx) = std::sync::mpsc::channel(); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + ( + Some(pxar::encoder::sync::StandardWriter::new(payload_writer)), + Some(rx), + Some(suggested_boundaries_tx), + Some(suggested_boundaries_rx), + ) + } else { + (None, None, None, None) + }; let error = Arc::new(Mutex::new(None)); let error2 = Arc::clone(&error); @@ -82,6 +87,7 @@ impl PxarBackupStream { }, options, boundaries, + suggested_boundaries_tx, ) .await { @@ -96,12 +102,14 @@ impl PxarBackupStream { let backup_stream = Self { rx: Some(rx), + suggested_boundaries: None, handle: Some(handle.clone()), error: Arc::clone(&error), }; let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), + suggested_boundaries: suggested_boundaries_rx, handle: Some(handle), error, }); diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index d620083e1..dccea230e 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -209,7 +209,7 @@ async fn backup_directory>( payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None, None); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -223,14 +223,19 @@ async fn backup_directory>( let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); - if let Some(payload_stream) = payload_stream { + if let Some(mut payload_stream) = payload_stream { let payload_target = payload_target .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); - let mut payload_chunk_stream = - ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); + let suggested_boundaries = payload_stream.suggested_boundaries.take(); + let mut payload_chunk_stream = ChunkStream::new( + payload_stream, + chunk_size, + Some(injection_data), + suggested_boundaries, + ); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -573,7 +578,8 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); + let catalog_chunk_stream = + ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None, None); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index e50cb8184..956c3246a 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -366,6 +366,7 @@ fn extract( |_| Ok(()), options, None, + None, ) .await } diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index c6d3794bb..85f96ad2c 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -407,6 +407,7 @@ async fn create_archive( }, options, None, + None, ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index d5ef85ffe..3f5b22177 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -40,6 +40,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { |_| Ok(()), options, None, + None, ))?; Command::new("cmp") -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel