From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id AC5791FF38F for ; Tue, 4 Jun 2024 10:50:37 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4D0F5D1E5; Tue, 4 Jun 2024 10:51:06 +0200 (CEST) Date: Tue, 04 Jun 2024 10:50:59 +0200 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20240528094303.309806-1-c.ebner@proxmox.com> <20240528094303.309806-42-c.ebner@proxmox.com> In-Reply-To: <20240528094303.309806-42-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1717490739.l0jtdxh70e.astroid@yuna.none> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.058 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs, proxmox.com] Subject: Re: [pbs-devel] [PATCH v8 proxmox-backup 41/69] upload stream: implement reused chunk injector X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On May 28, 2024 11:42 am, Christian Ebner wrote: > In order to be included in the backups index file, reused payload > chunks have to be injected into the payload upload stream at a > forced boundary. The chunker forces a chunk boundary and sends the > list of reusable dynamic entries to be uploaded. > > This implements the logic to receive these dynamic entries via the > corresponding communication channel from the chunker and inject the > entries into the backup upload stream by looking for the matching > chunk boundary, already forced by the chunker. > > Signed-off-by: Christian Ebner > --- > changes since version 7: > - no changes > > changes since version 6: > - no changes > > pbs-client/src/inject_reused_chunks.rs | 129 +++++++++++++++++++++++++ > pbs-client/src/lib.rs | 1 + > 2 files changed, 130 insertions(+) > create mode 100644 pbs-client/src/inject_reused_chunks.rs > > diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs > new file mode 100644 > index 000000000..ed147f5fb > --- /dev/null > +++ b/pbs-client/src/inject_reused_chunks.rs > @@ -0,0 +1,129 @@ > +use std::cmp; > +use std::pin::Pin; > +use std::sync::atomic::{AtomicUsize, Ordering}; > +use std::sync::{mpsc, Arc}; > +use std::task::{Context, Poll}; > + > +use anyhow::{anyhow, Error}; > +use futures::{ready, Stream}; > +use pin_project_lite::pin_project; > + > +use crate::pxar::create::ReusableDynamicEntry; > + > +pin_project! { > + pub struct InjectReusedChunksQueue { > + #[pin] > + input: S, > + next_injection: Option, > + buffer: Option, we successfully eliminated this buffer I think ;) > + injections: Option>, > + stream_len: Arc, > + } > +} > + > +type StreamOffset = u64; > +#[derive(Debug)] > +/// Holds a list of chunks to inject at the given boundary by forcing a chunk boundary. > +pub struct InjectChunks { > + /// Offset at which to force the boundary > + pub boundary: StreamOffset, > + /// List of chunks to inject > + pub chunks: Vec, > + /// Cumulative size of the chunks in the list > + pub size: usize, > +} > + > +/// Variants for stream consumer to distinguish between raw data chunks and injected ones. > +pub enum InjectedChunksInfo { > + Known(Vec), > + Raw(bytes::BytesMut), > +} > + > +pub trait InjectReusedChunks: Sized { > + fn inject_reused_chunks( > + self, > + injections: Option>, > + stream_len: Arc, > + ) -> InjectReusedChunksQueue; > +} > + > +impl InjectReusedChunks for S > +where > + S: Stream>, > +{ > + fn inject_reused_chunks( > + self, > + injections: Option>, > + stream_len: Arc, > + ) -> InjectReusedChunksQueue { > + InjectReusedChunksQueue { > + input: self, > + next_injection: None, > + injections, > + buffer: None, > + stream_len, > + } > + } > +} > + > +impl Stream for InjectReusedChunksQueue > +where > + S: Stream>, > +{ > + type Item = Result; > + > + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { > + let mut this = self.project(); > + > + // loop to skip over possible empty chunks > + loop { > + if this.next_injection.is_none() { > + if let Some(injections) = this.injections.as_mut() { > + if let Ok(injection) = injections.try_recv() { > + *this.next_injection = Some(injection); > + } > + } > + } > + > + if let Some(inject) = this.next_injection.take() { > + // got reusable dynamic entries to inject > + let offset = this.stream_len.load(Ordering::SeqCst) as u64; > + > + match inject.boundary.cmp(&offset) { > + // inject now > + cmp::Ordering::Equal => { > + let chunk_info = InjectedChunksInfo::Known(inject.chunks); > + return Poll::Ready(Some(Ok(chunk_info))); > + } > + // inject later > + cmp::Ordering::Greater => *this.next_injection = Some(inject), > + // incoming new chunks and injections didn't line up? > + cmp::Ordering::Less => { > + return Poll::Ready(Some(Err(anyhow!("invalid injection boundary")))) > + } > + } > + } > + > + // nothing to inject now, await further input > + match ready!(this.input.as_mut().poll_next(cx)) { > + None => { > + if let Some(injections) = this.injections.as_mut() { > + if this.next_injection.is_some() || injections.try_recv().is_ok() { > + // stream finished, but remaining dynamic entries to inject > + return Poll::Ready(Some(Err(anyhow!( > + "injection queue not fully consumed" > + )))); > + } > + } > + // stream finished and all dynamic entries already injected > + return Poll::Ready(None); > + } > + Some(Err(err)) => return Poll::Ready(Some(Err(err))), > + // ignore empty chunks, injected chunks from queue at forced boundary, but boundary > + // did not require splitting of the raw stream buffer to force the boundary > + Some(Ok(raw)) if raw.is_empty() => continue, > + Some(Ok(raw)) => return Poll::Ready(Some(Ok(InjectedChunksInfo::Raw(raw)))), > + } > + } > + } > +} > diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs > index 21cf8556b..3e7bd2a8b 100644 > --- a/pbs-client/src/lib.rs > +++ b/pbs-client/src/lib.rs > @@ -7,6 +7,7 @@ pub mod catalog_shell; > pub mod pxar; > pub mod tools; > > +mod inject_reused_chunks; > mod merge_known_chunks; > pub mod pipe_to_stream; > > -- > 2.39.2 > > > > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel > > > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel