From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 4AE7CB8CA2 for ; Mon, 11 Mar 2024 15:58:01 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 33580B5F4 for ; Mon, 11 Mar 2024 15:58:01 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Mon, 11 Mar 2024 15:57:59 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 71C7B48906 for ; Mon, 11 Mar 2024 15:57:59 +0100 (CET) Date: Mon, 11 Mar 2024 15:57:52 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240305092703.126906-1-c.ebner@proxmox.com> <20240305092703.126906-15-c.ebner@proxmox.com> In-Reply-To: <20240305092703.126906-15-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1710164125.yzk79dmpim.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.065 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: Re: [pbs-devel] [RFC v2 proxmox-backup 14/36] client: backup: split payload to dedicated stream X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 11 Mar 2024 14:58:01 -0000 On March 5, 2024 10:26 am, Christian Ebner wrote: > This patch is in preparation for being able to quickly lookup > metadata for previous snapshots, by splitting the upload of > a pxar archive into two dedicated streams, one for metadata, > being assigned a .pxar.meta.didx suffix and one for payload > data, being assigned a .pxar.pld.didx suffix. >=20 > The patch constructs all the required duplicate chunk stream, > backup writer and upload stream instances required for the > split archive uploads. >=20 > This not only makes it possible reuse the payload chunks for > further backup runs but keeps the metadata archive small, > with the outlook of even making the currently used catalog > obsolete. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 1: > - refactor pxar backup stream geneartion for split stream case > - refactor archive name generation for split archive case >=20 > pbs-client/src/pxar/create.rs | 4 + > pbs-client/src/pxar_backup_stream.rs | 48 +++++++++--- > proxmox-backup-client/src/main.rs | 75 +++++++++++++++++-- > .../src/proxmox_restore_daemon/api.rs | 12 ++- > pxar-bin/src/main.rs | 1 + > tests/catar.rs | 1 + > 6 files changed, 119 insertions(+), 22 deletions(-) >=20 > diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.r= s > index de8c0696..59aa4450 100644 > --- a/pbs-client/src/pxar/create.rs > +++ b/pbs-client/src/pxar/create.rs > @@ -141,6 +141,7 @@ pub async fn create_archive( > feature_flags: Flags, > callback: F, > catalog: Option>>, > + mut payload_writer: Option, this parameter position is a bit arbitrary - and the later additions in this series don't really make it better.. maybe we could use this as an opportunity for some house keeping, thinking about what should go into the `options`, and whether some of the rest could be meaningfully grouped? > options: PxarCreateOptions, > ) -> Result<(), Error> > where > @@ -171,6 +172,9 @@ where > } > =20 > let mut encoder =3D Encoder::new(&mut writer, &metadata).await?; > + if let Some(writer) =3D payload_writer.as_mut() { > + encoder =3D encoder.attach_payload_output(writer); > + } > =20 > let mut patterns =3D options.patterns; > =20 > diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_b= ackup_stream.rs > index 22a6ffdc..9a600cc1 100644 > --- a/pbs-client/src/pxar_backup_stream.rs > +++ b/pbs-client/src/pxar_backup_stream.rs > @@ -40,20 +40,34 @@ impl PxarBackupStream { > dir: Dir, > catalog: Arc>>, > options: crate::pxar::PxarCreateOptions, > - ) -> Result { > - let (tx, rx) =3D std::sync::mpsc::sync_channel(10); > - > + separate_payload_stream: bool, > + ) -> Result<(Self, Option), Error> { > let buffer_size =3D 256 * 1024; > =20 > - let error =3D Arc::new(Mutex::new(None)); > - let error2 =3D Arc::clone(&error); > - let handler =3D async move { > - let writer =3D TokioWriterAdapter::new(std::io::BufWriter::w= ith_capacity( > + let (tx, rx) =3D std::sync::mpsc::sync_channel(10); > + let writer =3D TokioWriterAdapter::new(std::io::BufWriter::with_= capacity( > + buffer_size, > + StdChannelWriter::new(tx), > + )); > + let writer =3D pxar::encoder::sync::StandardWriter::new(writer); > + > + let (payload_writer, payload_rx) =3D if separate_payload_stream = { > + let (tx, rx) =3D std::sync::mpsc::sync_channel(10); > + let payload_writer =3D TokioWriterAdapter::new(std::io::BufW= riter::with_capacity( > buffer_size, > StdChannelWriter::new(tx), > )); > + ( > + Some(pxar::encoder::sync::StandardWriter::new(payload_wr= iter)), > + Some(rx), > + ) > + } else { > + (None, None) > + }; > =20 > - let writer =3D pxar::encoder::sync::StandardWriter::new(writ= er); > + let error =3D Arc::new(Mutex::new(None)); > + let error2 =3D Arc::clone(&error); > + let handler =3D async move { > if let Err(err) =3D crate::pxar::create_archive( > dir, > writer, > @@ -63,6 +77,7 @@ impl PxarBackupStream { > Ok(()) > }, > Some(catalog), > + payload_writer, > options, > ) > .await > @@ -76,21 +91,30 @@ impl PxarBackupStream { > let future =3D Abortable::new(handler, registration); > tokio::spawn(future); > =20 > - Ok(Self { > + let backup_stream =3D Self { > + rx: Some(rx), > + handle: Some(handle.clone()), > + error: error.clone(), > + }; > + > + let backup_stream_payload =3D payload_rx.map(|rx| Self { nit: IMHO this is a `backup_payload_stream` (a stream of payload(s)), not a `backup_stream_payload` (the payload of a backup stream) > rx: Some(rx), > handle: Some(handle), > error, > - }) > + }); > + > + Ok((backup_stream, backup_stream_payload)) > } > =20 > pub fn open( > dirname: &Path, > catalog: Arc>>, > options: crate::pxar::PxarCreateOptions, > - ) -> Result { > + separate_payload_stream: bool, > + ) -> Result<(Self, Option), Error> { > let dir =3D nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mod= e::empty())?; > =20 > - Self::new(dir, catalog, options) > + Self::new(dir, catalog, options, separate_payload_stream) > } > } > =20 > diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/sr= c/main.rs > index 256080be..fd9a4b97 100644 > --- a/proxmox-backup-client/src/main.rs > +++ b/proxmox-backup-client/src/main.rs > @@ -187,17 +187,24 @@ async fn backup_directory>( > client: &BackupWriter, > dir_path: P, > archive_name: &str, > + payload_target: Option<&str>, > chunk_size: Option, > catalog: Arc>>>>, > pxar_create_options: pbs_client::pxar::PxarCreateOptions, > upload_options: UploadOptions, > -) -> Result { > - let pxar_stream =3D PxarBackupStream::open(dir_path.as_ref(), catalo= g, pxar_create_options)?; > - let mut chunk_stream =3D ChunkStream::new(pxar_stream, chunk_size); > +) -> Result<(BackupStats, Option), Error> { > if upload_options.fixed_size.is_some() { > bail!("cannot backup directory with fixed chunk size!"); > } > =20 > + let (pxar_stream, payload_stream) =3D PxarBackupStream::open( > + dir_path.as_ref(), > + catalog, > + pxar_create_options, > + payload_target.is_some(), > + )?; > + > + let mut chunk_stream =3D ChunkStream::new(pxar_stream, chunk_size); > let (tx, rx) =3D mpsc::channel(10); // allow to buffer 10 chunks > =20 > let stream =3D ReceiverStream::new(rx).map_err(Error::from); > @@ -209,12 +216,43 @@ async fn backup_directory>( > } > }); > =20 > + let stats =3D client.upload_stream(archive_name, stream, upload_opti= ons.clone()); > =20 > - let stats =3D client > - .upload_stream(archive_name, stream, upload_options) > - .await?; > + if let Some(payload_stream) =3D payload_stream { > + let payload_target =3D payload_target > + .ok_or_else(|| format_err!("got payload stream, but no targe= t archive name"))?; > =20 > - Ok(stats) > + let mut payload_chunk_stream =3D ChunkStream::new( > + payload_stream, > + chunk_size, > + ); > + let (payload_tx, payload_rx) =3D mpsc::channel(10); // allow to = buffer 10 chunks > + let stream =3D ReceiverStream::new(payload_rx).map_err(Error::fr= om); > + > + // spawn payload chunker inside a separate task so that it can r= un parallel > + tokio::spawn(async move { > + while let Some(v) =3D payload_chunk_stream.next().await { > + let _ =3D payload_tx.send(v).await; > + } > + }); > + > + let payload_stats =3D client.upload_stream( > + &payload_target, > + stream, > + upload_options, > + ); > + > + match futures::join!(stats, payload_stats) { > + (Ok(stats), Ok(payload_stats)) =3D> Ok((stats, Some(payload_= stats))), > + (Err(err), Ok(_)) =3D> Err(format_err!("upload failed: {err}= ")), > + (Ok(_), Err(err)) =3D> Err(format_err!("upload failed: {err}= ")), > + (Err(err), Err(payload_err)) =3D> { > + Err(format_err!("upload failed: {err} - {payload_err}")) > + } > + } > + } else { > + Ok((stats.await?, None)) > + } > } > =20 > async fn backup_image>( > @@ -985,6 +1023,16 @@ async fn create_backup( > manifest.add_file(target, stats.size, stats.csum, crypto= .mode)?; > } > (BackupSpecificationType::PXAR, false) =3D> { > + let metadata_mode =3D false; // Until enabled via param > + let (target, payload_target) =3D if metadata_mode { > + ( > + format!("{target_base}.meta.{extension}"), > + Some(format!("{target_base}.pld.{extension}")), *bikeshed mode on* - .pld is rather opaque from a user's perspective, maybe .data would be a more human readable counterpart to .meta ? > + ) > + } else { > + (target, None) > + }; > + > // start catalog upload on first use > if catalog.is_none() { > let catalog_upload_res =3D > @@ -1015,16 +1063,27 @@ async fn create_backup( > ..UploadOptions::default() > }; > =20 > - let stats =3D backup_directory( > + let (stats, payload_stats) =3D backup_directory( > &client, > &filename, > &target, > + payload_target.as_deref(), > chunk_size_opt, > catalog.clone(), > pxar_options, > upload_options, > ) > .await?; > + > + if let Some(payload_stats) =3D payload_stats { > + manifest.add_file( > + payload_target > + .ok_or_else(|| format_err!("missing payload = target archive"))?, > + payload_stats.size, > + payload_stats.csum, > + crypto.mode, > + )?; > + } > manifest.add_file(target, stats.size, stats.csum, crypto= .mode)?; > catalog.lock().unwrap().end_directory()?; > } > diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/p= roxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > index c2055222..bd8ddb20 100644 > --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > @@ -356,8 +356,16 @@ fn extract( > }; > =20 > let pxar_writer =3D TokioWriter::new(writer); > - create_archive(dir, pxar_writer, Flags::DEFAULT, |_|= Ok(()), None, options) > - .await > + create_archive( > + dir, > + pxar_writer, > + Flags::DEFAULT, > + |_| Ok(()), > + None, > + None, > + options, > + ) > + .await > } > .await; > if let Err(err) =3D result { > diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs > index 2bbe90e3..e3b0faac 100644 > --- a/pxar-bin/src/main.rs > +++ b/pxar-bin/src/main.rs > @@ -383,6 +383,7 @@ async fn create_archive( > Ok(()) > }, > None, > + None, > options, > ) > .await?; > diff --git a/tests/catar.rs b/tests/catar.rs > index 36bb4f3b..04af4ffd 100644 > --- a/tests/catar.rs > +++ b/tests/catar.rs > @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { > Flags::DEFAULT, > |_| Ok(()), > None, > + None, > options, > ))?; > =20 > --=20 > 2.39.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20 >=20 >=20