From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 3E48C20EC9A for ; Mon, 29 Apr 2024 14:11:58 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CD788FCAE; Mon, 29 Apr 2024 14:12:04 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Mon, 29 Apr 2024 14:10:23 +0200 Message-Id: <20240429121102.315059-20-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240429121102.315059-1-c.ebner@proxmox.com> References: <20240429121102.315059-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.029 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v4 proxmox-backup 19/58] client: pxar: optionally split metadata and payload streams X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" ... and attach the optional payload writer to the pxar archive creation. By this, metadata and payload data will create different dynamic indexes, allowing to lookup and reuse payload chunks without the additional overhead of the pxar archive's metadata. For now this functionality remains disabled and will be enabled in a later patch once the logic for reusing the payload chunks is in place. Signed-off-by: Christian Ebner --- pbs-client/src/pxar_backup_stream.rs | 49 +++++++++++++----- proxmox-backup-client/src/main.rs | 75 +++++++++++++++++++++++++--- 2 files changed, 103 insertions(+), 21 deletions(-) diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index cdfb7eaa8..95145cb0d 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -42,23 +42,37 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; - let error = Arc::new(Mutex::new(None)); - let error2 = Arc::clone(&error); - let handler = async move { - let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + let writer = pxar::encoder::sync::StandardWriter::new(writer); + + let (payload_writer, payload_rx) = if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( buffer_size, StdChannelWriter::new(tx), )); + ( + Some(pxar::encoder::sync::StandardWriter::new(payload_writer)), + Some(rx), + ) + } else { + (None, None) + }; - let writer = pxar::encoder::sync::StandardWriter::new(writer); + let error = Arc::new(Mutex::new(None)); + let error2 = Arc::clone(&error); + let handler = async move { if let Err(err) = crate::pxar::create_archive( dir, - PxarWriters::new(writer, None, Some(catalog)), + PxarWriters::new(writer, payload_writer, Some(catalog)), crate::pxar::Flags::DEFAULT, move |path| { log::debug!("{:?}", path); @@ -77,21 +91,30 @@ impl PxarBackupStream { let future = Abortable::new(handler, registration); tokio::spawn(future); - Ok(Self { + let backup_stream = Self { + rx: Some(rx), + handle: Some(handle.clone()), + error: Arc::clone(&error), + }; + + let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), handle: Some(handle), error, - }) + }); + + Ok((backup_stream, backup_payload_stream)) } pub fn open( dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options) + Self::new(dir, catalog, options, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index d8da36de4..ab7d316d4 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -187,18 +187,24 @@ async fn backup_directory>( client: &BackupWriter, dir_path: P, archive_name: &str, + payload_target: Option<&str>, chunk_size: Option, catalog: Arc>>>>, pxar_create_options: pbs_client::pxar::PxarCreateOptions, upload_options: UploadOptions, -) -> Result { +) -> Result<(BackupStats, Option), Error> { if upload_options.fixed_size.is_some() { bail!("cannot backup directory with fixed chunk size!"); } - let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let (pxar_stream, payload_stream) = PxarBackupStream::open( + dir_path.as_ref(), + catalog, + pxar_create_options, + payload_target.is_some(), + )?; + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -210,11 +216,36 @@ async fn backup_directory>( } }); - let stats = client - .upload_stream(archive_name, stream, upload_options) - .await?; + let stats = client.upload_stream(archive_name, stream, upload_options.clone()); - Ok(stats) + if let Some(payload_stream) = payload_stream { + let payload_target = payload_target + .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; + + let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); + let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks + let stream = ReceiverStream::new(payload_rx).map_err(Error::from); + + // spawn payload chunker inside a separate task so that it can run parallel + tokio::spawn(async move { + while let Some(v) = payload_chunk_stream.next().await { + let _ = payload_tx.send(v).await; + } + }); + + let payload_stats = client.upload_stream(&payload_target, stream, upload_options); + + match futures::join!(stats, payload_stats) { + (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), + (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")), + (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")), + (Err(err), Err(payload_err)) => { + Err(format_err!("upload failed: {err} - {payload_err}")) + } + } + } else { + Ok((stats.await?, None)) + } } async fn backup_image>( @@ -985,6 +1016,23 @@ async fn create_backup( manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; } (BackupSpecificationType::PXAR, false) => { + let metadata_mode = false; // Until enabled via param + + let target_base = if let Some(base) = target_base.strip_suffix(".pxar") { + base.to_string() + } else { + bail!("unexpected suffix in target: {target_base}"); + }; + + let (target, payload_target) = if metadata_mode { + ( + format!("{target_base}.mpxar.{extension}"), + Some(format!("{target_base}.ppxar.{extension}")), + ) + } else { + (target, None) + }; + // start catalog upload on first use if catalog.is_none() { let catalog_upload_res = @@ -1015,16 +1063,27 @@ async fn create_backup( ..UploadOptions::default() }; - let stats = backup_directory( + let (stats, payload_stats) = backup_directory( &client, &filename, &target, + payload_target.as_deref(), chunk_size_opt, catalog.clone(), pxar_options, upload_options, ) .await?; + + if let Some(payload_stats) = payload_stats { + manifest.add_file( + payload_target + .ok_or_else(|| format_err!("missing payload target archive"))?, + payload_stats.size, + payload_stats.csum, + crypto.mode, + )?; + } manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; catalog.lock().unwrap().end_directory()?; } -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel