From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6437CBC38E for ; Thu, 28 Mar 2024 13:46:50 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3E0A3AF19 for ; Thu, 28 Mar 2024 13:46:50 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 28 Mar 2024 13:46:49 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id B4C9542A0C for ; Thu, 28 Mar 2024 13:37:43 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 28 Mar 2024 13:36:29 +0100 Message-Id: <20240328123707.336951-21-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com> References: <20240328123707.336951-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.029 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 20/58] client: backup: split payload to dedicated stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 28 Mar 2024 12:46:50 -0000 This patch is in preparation for being able to quickly lookup metadata for previous snapshots, by splitting the upload of a pxar archive into two dedicated streams, one for metadata, being assigned a .mpxar.didx suffix and one for payload data, being assigned a .ppxar.didx suffix. The patch constructs all the required duplicate chunk stream, backup writer and upload stream instances required for the split archive uploads. This not only makes it possible reuse the payload chunks for further backup runs but keeps the metadata archive small, with the outlook of even making the currently used catalog obsolete. Signed-off-by: Christian Ebner --- changes since version 2: - pass optional combined writer struct - s/backup_stream_payload/backup_payload_stream/ - use new mpxar and ppxar file extensions pbs-client/src/pxar/create.rs | 20 ++++- pbs-client/src/pxar_backup_stream.rs | 49 ++++++++--- proxmox-backup-client/src/main.rs | 81 +++++++++++++++++-- .../src/proxmox_restore_daemon/api.rs | 2 +- pxar-bin/src/main.rs | 2 +- tests/catar.rs | 2 +- 6 files changed, 129 insertions(+), 27 deletions(-) diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 82f05889b..2bb5a6253 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -137,12 +137,21 @@ type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; pub struct PxarWriters { writer: T, + payload_writer: Option, catalog: Option>>, } impl PxarWriters { - pub fn new(writer: T, catalog: Option>>) -> Self { - Self { writer, catalog } + pub fn new( + writer: T, + payload_writer: Option, + catalog: Option>>, + ) -> Self { + Self { + writer, + payload_writer, + catalog, + } } } @@ -180,7 +189,12 @@ where set.insert(stat.st_dev); } - let mut encoder = Encoder::new(&mut writers.writer, &metadata, None).await?; + let mut encoder = Encoder::new( + &mut writers.writer, + &metadata, + writers.payload_writer.as_mut(), + ) + .await?; let mut patterns = options.patterns; diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index bfa108a8b..95145cb0d 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -42,23 +42,37 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { - let (tx, rx) = std::sync::mpsc::sync_channel(10); - + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; - let error = Arc::new(Mutex::new(None)); - let error2 = Arc::clone(&error); - let handler = async move { - let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( + buffer_size, + StdChannelWriter::new(tx), + )); + let writer = pxar::encoder::sync::StandardWriter::new(writer); + + let (payload_writer, payload_rx) = if separate_payload_stream { + let (tx, rx) = std::sync::mpsc::sync_channel(10); + let payload_writer = TokioWriterAdapter::new(std::io::BufWriter::with_capacity( buffer_size, StdChannelWriter::new(tx), )); + ( + Some(pxar::encoder::sync::StandardWriter::new(payload_writer)), + Some(rx), + ) + } else { + (None, None) + }; - let writer = pxar::encoder::sync::StandardWriter::new(writer); + let error = Arc::new(Mutex::new(None)); + let error2 = Arc::clone(&error); + let handler = async move { if let Err(err) = crate::pxar::create_archive( dir, - PxarWriters::new(writer, Some(catalog)), + PxarWriters::new(writer, payload_writer, Some(catalog)), crate::pxar::Flags::DEFAULT, move |path| { log::debug!("{:?}", path); @@ -77,21 +91,30 @@ impl PxarBackupStream { let future = Abortable::new(handler, registration); tokio::spawn(future); - Ok(Self { + let backup_stream = Self { + rx: Some(rx), + handle: Some(handle.clone()), + error: Arc::clone(&error), + }; + + let backup_payload_stream = payload_rx.map(|rx| Self { rx: Some(rx), handle: Some(handle), error, - }) + }); + + Ok((backup_stream, backup_payload_stream)) } pub fn open( dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, - ) -> Result { + separate_payload_stream: bool, + ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options) + Self::new(dir, catalog, options, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index 931c841c7..dc6fe0e8d 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -187,18 +187,24 @@ async fn backup_directory>( client: &BackupWriter, dir_path: P, archive_name: &str, + payload_target: Option<&str>, chunk_size: Option, catalog: Arc>>>>, pxar_create_options: pbs_client::pxar::PxarCreateOptions, upload_options: UploadOptions, -) -> Result { +) -> Result<(BackupStats, Option), Error> { if upload_options.fixed_size.is_some() { bail!("cannot backup directory with fixed chunk size!"); } - let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let (pxar_stream, payload_stream) = PxarBackupStream::open( + dir_path.as_ref(), + catalog, + pxar_create_options, + payload_target.is_some(), + )?; + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -210,12 +216,43 @@ async fn backup_directory>( } }); + let stats = client.upload_stream(archive_name, stream, upload_options.clone()); - let stats = client - .upload_stream(archive_name, stream, upload_options) - .await?; + if let Some(payload_stream) = payload_stream { + let payload_target = payload_target + .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; - Ok(stats) + let mut payload_chunk_stream = ChunkStream::new( + payload_stream, + chunk_size, + ); + let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks + let stream = ReceiverStream::new(payload_rx).map_err(Error::from); + + // spawn payload chunker inside a separate task so that it can run parallel + tokio::spawn(async move { + while let Some(v) = payload_chunk_stream.next().await { + let _ = payload_tx.send(v).await; + } + }); + + let payload_stats = client.upload_stream( + &payload_target, + stream, + upload_options, + ); + + match futures::join!(stats, payload_stats) { + (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), + (Err(err), Ok(_)) => Err(format_err!("upload failed: {err}")), + (Ok(_), Err(err)) => Err(format_err!("upload failed: {err}")), + (Err(err), Err(payload_err)) => { + Err(format_err!("upload failed: {err} - {payload_err}")) + } + } + } else { + Ok((stats.await?, None)) + } } async fn backup_image>( @@ -986,6 +1023,23 @@ async fn create_backup( manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; } (BackupSpecificationType::PXAR, false) => { + let metadata_mode = false; // Until enabled via param + + let target_base = if let Some(base) = target_base.strip_suffix(".pxar") { + base.to_string() + } else { + bail!("unexpected suffix in target: {target_base}"); + }; + + let (target, payload_target) = if metadata_mode { + ( + format!("{target_base}.mpxar.{extension}"), + Some(format!("{target_base}.ppxar.{extension}")), + ) + } else { + (target, None) + }; + // start catalog upload on first use if catalog.is_none() { let catalog_upload_res = @@ -1016,16 +1070,27 @@ async fn create_backup( ..UploadOptions::default() }; - let stats = backup_directory( + let (stats, payload_stats) = backup_directory( &client, &filename, &target, + payload_target.as_deref(), chunk_size_opt, catalog.clone(), pxar_options, upload_options, ) .await?; + + if let Some(payload_stats) = payload_stats { + manifest.add_file( + payload_target + .ok_or_else(|| format_err!("missing payload target archive"))?, + payload_stats.size, + payload_stats.csum, + crypto.mode, + )?; + } manifest.add_file(target, stats.size, stats.csum, crypto.mode)?; catalog.lock().unwrap().end_directory()?; } diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index 4e63978b7..ea97976e6 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -360,7 +360,7 @@ fn extract( let pxar_writer = TokioWriter::new(writer); create_archive( dir, - PxarWriters::new(pxar_writer, None), + PxarWriters::new(pxar_writer, None, None), Flags::DEFAULT, |_| Ok(()), options, diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 7083a4b82..6c13c3b17 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -377,7 +377,7 @@ async fn create_archive( let writer = pxar::encoder::sync::StandardWriter::new(writer); pbs_client::pxar::create_archive( dir, - PxarWriters::new(writer, None), + PxarWriters::new(writer, None, None), feature_flags, move |path| { log::debug!("{:?}", path); diff --git a/tests/catar.rs b/tests/catar.rs index f414da8c9..9e96a8610 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -35,7 +35,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(create_archive( dir, - PxarWriters::new(writer, None), + PxarWriters::new(writer, None, None), Flags::DEFAULT, |_| Ok(()), options, -- 2.39.2