From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 3E18391AD3 for ; Thu, 4 Apr 2024 16:24:48 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1C21A32E3 for ; Thu, 4 Apr 2024 16:24:18 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 4 Apr 2024 16:24:16 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 52C6C45128 for ; Thu, 4 Apr 2024 16:24:16 +0200 (CEST) Date: Thu, 04 Apr 2024 16:24:08 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240328123707.336951-1-c.ebner@proxmox.com> <20240328123707.336951-39-c.ebner@proxmox.com> In-Reply-To: <20240328123707.336951-39-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1712235368.4ka0m21w6d.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.059 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH v3 proxmox-backup 38/58] upload stream: impl reused chunk injector X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 04 Apr 2024 14:24:48 -0000 On March 28, 2024 1:36 pm, Christian Ebner wrote: > In order to be included in the backups index file, reused payload > chunks have to be injected into the payload upload stream. >=20 > The chunker forces a chunk boundary and queues the list of chunks to > be uploaded thereafter. >=20 > This implements the logic to inject the chunks into the chunk upload > stream after such a boundary is requested, by looping over the queued > chunks and inserting them into the stream. >=20 > Signed-off-by: Christian Ebner > --- > changes since version 2: > - no changes >=20 > pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++ > pbs-client/src/lib.rs | 1 + > 2 files changed, 153 insertions(+) > create mode 100644 pbs-client/src/inject_reused_chunks.rs >=20 > diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inje= ct_reused_chunks.rs > new file mode 100644 > index 000000000..5cc19ce5d > --- /dev/null > +++ b/pbs-client/src/inject_reused_chunks.rs > @@ -0,0 +1,152 @@ > +use std::collections::VecDeque; > +use std::pin::Pin; > +use std::sync::atomic::{AtomicUsize, Ordering}; > +use std::sync::{Arc, Mutex}; > +use std::task::{Context, Poll}; > + > +use anyhow::{anyhow, Error}; > +use futures::{ready, Stream}; > +use pin_project_lite::pin_project; > + > +use crate::pxar::create::ReusableDynamicEntry; > + > +pin_project! { > + pub struct InjectReusedChunksQueue { > + #[pin] > + input: S, > + current: Option, > + buffer: Option, > + injection_queue: Arc>>, > + stream_len: Arc, > + reused_len: Arc, > + index_csum: Arc>>, > + } > +} > + > +#[derive(Debug)] > +pub struct InjectChunks { > + pub boundary: u64, > + pub chunks: Vec, > + pub size: usize, > +} > + > +pub enum InjectedChunksInfo { > + Known(Vec<(u64, [u8; 32])>), > + Raw((u64, bytes::BytesMut)), this ones might benefit from a comment or typedef to explain what the u64 is/u64s are.. > +} > + > +pub trait InjectReusedChunks: Sized { > + fn inject_reused_chunks( > + self, > + injection_queue: Arc>>, > + stream_len: Arc, > + reused_len: Arc, > + index_csum: Arc>>, this doesn't actually need to be an Option I think. it's always there after all, and we just need the Arc> to ensure updates are serialized. for the final `finish` call we can just use Arc::to_inner and Mutex::to_inner to get the owned Sha256 out of it (there can't be any `update`s afterwards in any case). > + ) -> InjectReusedChunksQueue; > +} > + > +impl InjectReusedChunks for S > +where > + S: Stream>, > +{ > + fn inject_reused_chunks( > + self, > + injection_queue: Arc>>, > + stream_len: Arc, > + reused_len: Arc, > + index_csum: Arc>>, > + ) -> InjectReusedChunksQueue { > + InjectReusedChunksQueue { > + input: self, > + current: None, > + injection_queue, > + buffer: None, > + stream_len, > + reused_len, > + index_csum, > + } > + } > +} > + > +impl Stream for InjectReusedChunksQueue > +where > + S: Stream>, > +{ > + type Item =3D Result; > + > + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { and this fn here could use some comments as well IMHO ;) I hope I didn't misunderstand anything and my suggestions below are correct.. > + let mut this =3D self.project(); > + loop { > + let current =3D this.current.take(); > + if let Some(current) =3D current { the take can be inlined here.. > + let mut chunks =3D Vec::new(); > + let mut guard =3D this.index_csum.lock().unwrap(); > + let csum =3D guard.as_mut().unwrap(); > + > + for chunk in current.chunks { > + let offset =3D this > + .stream_len > + .fetch_add(chunk.size() as usize, Ordering::SeqC= st) > + as u64; > + this.reused_len > + .fetch_add(chunk.size() as usize, Ordering::SeqC= st); > + let digest =3D chunk.digest(); > + chunks.push((offset, digest)); > + let end_offset =3D offset + chunk.size(); > + csum.update(&end_offset.to_le_bytes()); > + csum.update(&digest); > + } > + let chunk_info =3D InjectedChunksInfo::Known(chunks); > + return Poll::Ready(Some(Ok(chunk_info))); okay, so this part here takes care of accounting known chunks, updating the index digest and passing them along > + } > + > + let buffer =3D this.buffer.take(); > + if let Some(buffer) =3D buffer { take can be inlined > + let offset =3D this.stream_len.fetch_add(buffer.len(), O= rdering::SeqCst) as u64; > + let data =3D InjectedChunksInfo::Raw((offset, buffer)); > + return Poll::Ready(Some(Ok(data))); this part here takes care of accounting for and passing along a new chunk and its data, if it had to be buffered because injected chunks came first.. > + } > + > + match ready!(this.input.as_mut().poll_next(cx)) { > + None =3D> return Poll::Ready(None), that one's purpose is pretty clear - should we also check that there is no more injection stuff in the queue here as that would mean something fundamental went wrong? > + Some(Err(err)) =3D> return Poll::Ready(Some(Err(err))), > + Some(Ok(raw)) =3D> { > + let chunk_size =3D raw.len(); > + let offset =3D this.stream_len.load(Ordering::SeqCst= ) as u64; > + let mut injections =3D this.injection_queue.lock().u= nwrap(); > + > + if let Some(inject) =3D injections.pop_front() { > + if inject.boundary =3D=3D offset { if we do what I suggest below (X), then this branch here is the only one touching current and buffer. that in turn means we can inline the handling of current (dropping it altogether from InjectReusedChunksQueue) and drop the loop and continue > + if this.current.replace(inject).is_some() { > + return Poll::Ready(Some(Err(anyhow!( > + "replaced injection queue not empty" > + )))); > + } > + if chunk_size > 0 && this.buffer.replace(raw= ).is_some() { I guess this means chunks with size 0 are used in case everything is re-used? or is the difference between here and below (where chunk_size 0 is a fatal error) accidental? > + return Poll::Ready(Some(Err(anyhow!( > + "replaced buffer not empty" > + )))); with all the other changes, this should be impossible to trigger.. then again, it probably doesn't hurt as a safeguard either.. > + } > + continue; > + } else if inject.boundary =3D=3D offset + chunk_= size as u64 { > + let _ =3D this.current.insert(inject); X: since we add the chunk size to the offset below, this means that the nex= t poll ends up in the previous branch of the if (boundary =3D=3D offset), even if we remove this whole condition and branch > + } else if inject.boundary < offset + chunk_size = as u64 { > + return Poll::Ready(Some(Err(anyhow!("invalid= injection boundary")))); > + } else { > + injections.push_front(inject); I normally dislike this kind of code (pop - check - push), but I guess here it doesn't hurt throughput too much since the rest of the things we do is more expensive anyway.. > + } > + } > + > + if chunk_size =3D=3D 0 { > + return Poll::Ready(Some(Err(anyhow!("unexpected = empty raw data")))); > + } > + > + let offset =3D this.stream_len.fetch_add(chunk_size,= Ordering::SeqCst) as u64; > + let data =3D InjectedChunksInfo::Raw((offset, raw)); > + > + return Poll::Ready(Some(Ok(data))); > + } > + } > + } > + } > +} anyhow, here's a slightly simplified version of poll_next: fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this =3D self.project(); if let Some(buffer) =3D this.buffer.take() { let offset =3D this.stream_len.fetch_add(buffer.len(), Ordering= ::SeqCst) as u64; let data =3D InjectedChunksInfo::Raw((offset, buffer)); return Poll::Ready(Some(Ok(data))); } match ready!(this.input.as_mut().poll_next(cx)) { None =3D> Poll::Ready(None), Some(Err(err)) =3D> Poll::Ready(Some(Err(err))), Some(Ok(raw)) =3D> { let chunk_size =3D raw.len(); let offset =3D this.stream_len.load(Ordering::SeqCst) as u6= 4; let mut injections =3D this.injection_queue.lock().unwrap()= ; if let Some(inject) =3D injections.pop_front() { // inject chunk now, buffer incoming data for later if inject.boundary =3D=3D offset { if chunk_size > 0 && this.buffer.replace(raw).is_so= me() { return Poll::Ready(Some(Err(anyhow!("replaced b= uffer not empty")))); } let mut chunks =3D Vec::new(); let mut csum =3D this.index_csum.lock().unwrap(); for chunk in inject.chunks { let offset =3D this .stream_len .fetch_add(chunk.size() as usize, Ordering:= :SeqCst) as u64; this.reused_len .fetch_add(chunk.size() as usize, Ordering:= :SeqCst); let digest =3D chunk.digest(); chunks.push((offset, digest)); let end_offset =3D offset + chunk.size(); csum.update(&end_offset.to_le_bytes()); csum.update(&digest); } let chunk_info =3D InjectedChunksInfo::Known(chunks= ); return Poll::Ready(Some(Ok(chunk_info))); } else if inject.boundary < offset + chunk_size as u64 = { return Poll::Ready(Some(Err(anyhow!("invalid inject= ion boundary")))); } else { injections.push_front(inject); } } if chunk_size =3D=3D 0 { return Poll::Ready(Some(Err(anyhow!("unexpected empty r= aw data")))); } let offset =3D this.stream_len.fetch_add(chunk_size, Orderi= ng::SeqCst) as u64; let data =3D InjectedChunksInfo::Raw((offset, raw)); Poll::Ready(Some(Ok(data))) } } } this has the index_csum no longer an Option folded in, so requires a few adaptations in other parts as well. but I'd like the following even better, since it allows us to get rid of the buffer altogether: fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this =3D self.project(); let mut injections =3D this.injection_queue.lock().unwrap(); // check whether we have something to inject if let Some(inject) =3D injections.pop_front() { let offset =3D this.stream_len.load(Ordering::SeqCst) as u64; if inject.boundary =3D=3D offset { // inject now let mut chunks =3D Vec::new(); let mut csum =3D this.index_csum.lock().unwrap(); // account for injected chunks for chunk in inject.chunks { let offset =3D this .stream_len .fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64; this.reused_len .fetch_add(chunk.size() as usize, Ordering::SeqCst)= ; let digest =3D chunk.digest(); chunks.push((offset, digest)); let end_offset =3D offset + chunk.size(); csum.update(&end_offset.to_le_bytes()); csum.update(&digest); } let chunk_info =3D InjectedChunksInfo::Known(chunks); return Poll::Ready(Some(Ok(chunk_info))); } else if inject.boundary < offset { // incoming new chunks and injections didn't line up? return Poll::Ready(Some(Err(anyhow!("invalid injection boun= dary")))); } else { // inject later injections.push_front(inject); } } // nothing to inject now, let's see if there's further input match ready!(this.input.as_mut().poll_next(cx)) { None =3D> Poll::Ready(None), Some(Err(err)) =3D> Poll::Ready(Some(Err(err))), Some(Ok(raw)) if raw.is_empty() =3D> { Poll::Ready(Some(Err(anyhow!("unexpected empty raw data")))= ) } Some(Ok(raw)) =3D> { let offset =3D this.stream_len.fetch_add(raw.len(), Orderin= g::SeqCst) as u64; let data =3D InjectedChunksInfo::Raw((offset, raw)); Poll::Ready(Some(Ok(data))) } } } but technically all this accounting could move back to the backup_writer as well, if the injected chunk info also contained the size.. > diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs > index 21cf8556b..3e7bd2a8b 100644 > --- a/pbs-client/src/lib.rs > +++ b/pbs-client/src/lib.rs > @@ -7,6 +7,7 @@ pub mod catalog_shell; > pub mod pxar; > pub mod tools; > =20 > +mod inject_reused_chunks; > mod merge_known_chunks; > pub mod pipe_to_stream; > =20 > --=20 > 2.39.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20 >=20 >=20