From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 883B991D78 for ; Thu, 4 Apr 2024 16:52:53 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6ABE95219 for ; Thu, 4 Apr 2024 16:52:23 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 4 Apr 2024 16:52:22 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 6BD6045147 for ; Thu, 4 Apr 2024 16:52:22 +0200 (CEST) Date: Thu, 04 Apr 2024 16:52:15 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240328123707.336951-1-c.ebner@proxmox.com> <20240328123707.336951-41-c.ebner@proxmox.com> In-Reply-To: <20240328123707.336951-41-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1712241225.maig1bup9p.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.058 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 04 Apr 2024 14:52:53 -0000 On March 28, 2024 1:36 pm, Christian Ebner wrote: > Adds a queue to the chunk stream to request forced boundaries at a > given offset within the stream and inject reused dynamic entries > after this boundary. >=20 > The chunks are then passed along to the uploader stream using the > injection queue, which inserts them during upload. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 2: > - combined queues into new optional struct > - refactoring >=20 > examples/test_chunk_speed2.rs | 2 +- > pbs-client/src/backup_writer.rs | 89 +++++++++++-------- > pbs-client/src/chunk_stream.rs | 36 +++++++- > pbs-client/src/pxar/create.rs | 6 +- > pbs-client/src/pxar_backup_stream.rs | 7 +- > proxmox-backup-client/src/main.rs | 31 ++++--- > .../src/proxmox_restore_daemon/api.rs | 1 + > pxar-bin/src/main.rs | 1 + > tests/catar.rs | 1 + > 9 files changed, 121 insertions(+), 53 deletions(-) >=20 > diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.r= s > index 3f69b436d..22dd14ce2 100644 > --- a/examples/test_chunk_speed2.rs > +++ b/examples/test_chunk_speed2.rs > @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { > .map_err(Error::from); > =20 > //let chunk_stream =3D FixedChunkStream::new(stream, 4*1024*1024); > - let mut chunk_stream =3D ChunkStream::new(stream, None); > + let mut chunk_stream =3D ChunkStream::new(stream, None, None); > =20 > let start_time =3D std::time::Instant::now(); > =20 > diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writ= er.rs > index 8bd0e4f36..032d93da7 100644 > --- a/pbs-client/src/backup_writer.rs > +++ b/pbs-client/src/backup_writer.rs > @@ -1,4 +1,4 @@ > -use std::collections::HashSet; > +use std::collections::{HashSet, VecDeque}; > use std::future::Future; > use std::os::unix::fs::OpenOptionsExt; > use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; > @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; > =20 > use proxmox_human_byte::HumanByte; > =20 > +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, Inje= ctedChunksInfo}; > use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; > =20 > use super::{H2Client, HttpClient}; > @@ -265,6 +266,7 @@ impl BackupWriter { > archive_name: &str, > stream: impl Stream>, > options: UploadOptions, > + injection_queue: Option>>>, > ) -> Result { > let known_chunks =3D Arc::new(Mutex::new(HashSet::new())); > =20 > @@ -341,6 +343,7 @@ impl BackupWriter { > None > }, > options.compress, > + injection_queue, > ) > .await?; > =20 > @@ -637,6 +640,7 @@ impl BackupWriter { > known_chunks: Arc>>, > crypt_config: Option>, > compress: bool, > + injection_queue: Option>>>, > ) -> impl Future> { > let total_chunks =3D Arc::new(AtomicUsize::new(0)); > let total_chunks2 =3D total_chunks.clone(); > @@ -663,48 +667,63 @@ impl BackupWriter { > let index_csum_2 =3D index_csum.clone(); > =20 > stream > - .and_then(move |data| { > - let chunk_len =3D data.len(); > + .inject_reused_chunks( > + injection_queue.unwrap_or_default(), > + stream_len, > + reused_len.clone(), > + index_csum.clone(), > + ) > + .and_then(move |chunk_info| match chunk_info { for this part here I am still not sure whether doing all of the accounting here wouldn't be nicer.. > [..] > diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream= .rs > index a45420ca0..6ac0c638b 100644 > --- a/pbs-client/src/chunk_stream.rs > +++ b/pbs-client/src/chunk_stream.rs > @@ -38,15 +38,17 @@ pub struct ChunkStream { > chunker: Chunker, > buffer: BytesMut, > scan_pos: usize, > + injection_data: Option, > } > =20 > impl ChunkStream { > - pub fn new(input: S, chunk_size: Option) -> Self { > + pub fn new(input: S, chunk_size: Option, injection_data: Opti= on) -> Self { > Self { > input, > chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024))= , > buffer: BytesMut::new(), > scan_pos: 0, > + injection_data, > } > } > } > @@ -64,6 +66,34 @@ where > fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { > let this =3D self.get_mut(); > loop { > + if let Some(InjectionData { > + boundaries, > + injections, > + consumed, > + }) =3D this.injection_data.as_mut() > + { > + // Make sure to release this lock as soon as possible > + let mut boundaries =3D boundaries.lock().unwrap(); > + if let Some(inject) =3D boundaries.pop_front() { here I am a bit more wary that this popping and re-pushing might hurt performance.. > + let max =3D *consumed + this.buffer.len() as u64; > + if inject.boundary <=3D max { > + let chunk_size =3D (inject.boundary - *consumed)= as usize; > + let result =3D this.buffer.split_to(chunk_size); a comment or better variable naming would make this easier to follow along.. "result" is a forced chunk that is created here because we've reached a point where we want to inject something afterwards.. once more I am wondering here whether for the payload stream, a vastly simplified chunker that just picks the boundaries based on re-use and payload size(s) (to avoid the one file =3D=3D one chunk pathological case for lots of small files) wouldn't improve performance :) > + *consumed +=3D chunk_size as u64; > + this.scan_pos =3D 0; > + > + // Add the size of the injected chunks to consum= ed, so chunk stream offsets > + // are in sync with the rest of the archive. > + *consumed +=3D inject.size as u64; > + > + injections.lock().unwrap().push_back(inject); > + > + return Poll::Ready(Some(Ok(result))); > + } > + boundaries.push_front(inject); > + } > + } > + > if this.scan_pos < this.buffer.len() { > let boundary =3D this.chunker.scan(&this.buffer[this.sca= n_pos..]); > =20 > @@ -74,7 +104,11 @@ where > // continue poll > } else if chunk_size <=3D this.buffer.len() { > let result =3D this.buffer.split_to(chunk_size); > + if let Some(InjectionData { consumed, .. }) =3D this= .injection_data.as_mut() { > + *consumed +=3D chunk_size as u64; > + } > this.scan_pos =3D 0; > + > return Poll::Ready(Some(Ok(result))); > } else { > panic!("got unexpected chunk boundary from chunker")= ; > [..]