From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 6EFE295F9A for ; Wed, 28 Feb 2024 15:09:49 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 52240E1BD for ; Wed, 28 Feb 2024 15:09:19 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:09:18 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 240D347A14 for ; Wed, 28 Feb 2024 15:02:52 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:02:15 +0100 Message-Id: <20240228140226.1251979-26-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.045 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [lib.rs] Subject: [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:09:49 -0000 In order to be included in the backups index file, reused payload chunks have to be injected into the payload upload stream. The chunker forces a chunk boundary and queues the list of chunks to be uploaded thereafter. This implements the logic to inject the chunks into the chunk upload stream after such a boundary is requested, by looping over the queued chunks and inserting them into the stream. Signed-off-by: Christian Ebner --- pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++ pbs-client/src/lib.rs | 1 + 2 files changed, 153 insertions(+) create mode 100644 pbs-client/src/inject_reused_chunks.rs diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs new file mode 100644 index 00000000..7c0f7780 --- /dev/null +++ b/pbs-client/src/inject_reused_chunks.rs @@ -0,0 +1,152 @@ +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use anyhow::{anyhow, Error}; +use futures::{ready, Stream}; +use pin_project_lite::pin_project; + +use pbs_datastore::dynamic_index::AppendableDynamicEntry; + +pin_project! { + pub struct InjectReusedChunksQueue { + #[pin] + input: S, + current: Option, + buffer: Option, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + } +} + +#[derive(Debug)] +pub struct InjectChunks { + pub boundary: u64, + pub chunks: Vec, + pub size: usize, +} + +pub enum InjectedChunksInfo { + Known(Vec<(u64, [u8; 32])>), + Raw((u64, bytes::BytesMut)), +} + +pub trait InjectReusedChunks: Sized { + fn inject_reused_chunks( + self, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + ) -> InjectReusedChunksQueue; +} + +impl InjectReusedChunks for S +where + S: Stream>, +{ + fn inject_reused_chunks( + self, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + ) -> InjectReusedChunksQueue { + InjectReusedChunksQueue { + input: self, + current: None, + injection_queue, + buffer: None, + stream_len, + reused_len, + index_csum, + } + } +} + +impl Stream for InjectReusedChunksQueue +where + S: Stream>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + loop { + let current = this.current.take(); + if let Some(current) = current { + let mut chunks = Vec::new(); + let mut guard = this.index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); + + for chunk in current.chunks { + let offset = this + .stream_len + .fetch_add(chunk.size() as usize, Ordering::SeqCst) + as u64; + this.reused_len + .fetch_add(chunk.size() as usize, Ordering::SeqCst); + let digest = chunk.digest(); + chunks.push((offset, digest)); + let end_offset = offset + chunk.size(); + csum.update(&end_offset.to_le_bytes()); + csum.update(&digest); + } + let chunk_info = InjectedChunksInfo::Known(chunks); + return Poll::Ready(Some(Ok(chunk_info))); + } + + let buffer = this.buffer.take(); + if let Some(buffer) = buffer { + let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64; + let data = InjectedChunksInfo::Raw((offset, buffer)); + return Poll::Ready(Some(Ok(data))); + } + + match ready!(this.input.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(Err(err)) => return Poll::Ready(Some(Err(err))), + Some(Ok(raw)) => { + let chunk_size = raw.len(); + let offset = this.stream_len.load(Ordering::SeqCst) as u64; + let mut injections = this.injection_queue.lock().unwrap(); + + if let Some(inject) = injections.pop_front() { + if inject.boundary == offset { + if this.current.replace(inject).is_some() { + return Poll::Ready(Some(Err(anyhow!( + "replaced injection queue not empty" + )))); + } + if chunk_size > 0 && this.buffer.replace(raw).is_some() { + return Poll::Ready(Some(Err(anyhow!( + "replaced buffer not empty" + )))); + } + continue; + } else if inject.boundary == offset + chunk_size as u64 { + let _ = this.current.insert(inject); + } else if inject.boundary < offset + chunk_size as u64 { + return Poll::Ready(Some(Err(anyhow!("invalid injection boundary")))); + } else { + injections.push_front(inject); + } + } + + if chunk_size == 0 { + return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data")))); + } + + let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64; + let data = InjectedChunksInfo::Raw((offset, raw)); + + return Poll::Ready(Some(Ok(data))); + } + } + } + } +} diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs index 21cf8556..3e7bd2a8 100644 --- a/pbs-client/src/lib.rs +++ b/pbs-client/src/lib.rs @@ -7,6 +7,7 @@ pub mod catalog_shell; pub mod pxar; pub mod tools; +mod inject_reused_chunks; mod merge_known_chunks; pub mod pipe_to_stream; -- 2.39.2