From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id EE88998B48 for ; Wed, 15 Nov 2023 16:49:28 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1F0AD965F for ; Wed, 15 Nov 2023 16:48:35 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 15 Nov 2023 16:48:33 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 6ECD4432DA for ; Wed, 15 Nov 2023 16:48:33 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 15 Nov 2023 16:48:02 +0100 Message-Id: <20231115154813.281564-18-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20231115154813.281564-1-c.ebner@proxmox.com> References: <20231115154813.281564-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.057 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH v5 proxmox-backup 17/28] fix #3174: upload stream: impl reused chunk injector X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 15 Nov 2023 15:49:29 -0000 In order to be included in the backups index file, the reused chunks which store the payload of skipped files during pxar encoding have to be inserted after the encoder has written the pxar appendix entry type. The chunker forces a chunk boundary after this marker and queues the list of chunks to be uploaded thereafter. This implements the logic to inject the chunks into the chunk upload stream after such a boundary is requested, by looping over the queued chunks and inserting them into the stream. Signed-off-by: Christian Ebner --- Changes since version 4: - fix premature chunk injection by incorrect initialization Changes since version 3: - count appendix chunks as reused chunks - fix issue with stream being corrupted by missing buffering of data when injecting Changes since version 2: - no changes Changes since version 1: - no changes pbs-client/src/inject_reused_chunks.rs | 153 +++++++++++++++++++++++++ pbs-client/src/lib.rs | 1 + 2 files changed, 154 insertions(+) create mode 100644 pbs-client/src/inject_reused_chunks.rs diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs new file mode 100644 index 00000000..5811648c --- /dev/null +++ b/pbs-client/src/inject_reused_chunks.rs @@ -0,0 +1,153 @@ +use std::collections::VecDeque; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; + +use anyhow::{anyhow, Error}; +use futures::{ready, Stream}; +use pin_project_lite::pin_project; + +use pbs_datastore::dynamic_index::DynamicEntry; + +pin_project! { + pub struct InjectReusedChunksQueue { + #[pin] + input: S, + current: Option, + buffer: Option, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + } +} + +#[derive(Debug)] +pub struct InjectChunks { + pub boundary: u64, + pub chunks: Vec, + pub size: usize, +} + +pub enum InjectedChunksInfo { + Known(Vec<(u64, [u8; 32])>), + Raw((u64, bytes::BytesMut)), +} + +pub trait InjectReusedChunks: Sized { + fn inject_reused_chunks( + self, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + ) -> InjectReusedChunksQueue; +} + +impl InjectReusedChunks for S +where + S: Stream>, +{ + fn inject_reused_chunks( + self, + injection_queue: Arc>>, + stream_len: Arc, + reused_len: Arc, + index_csum: Arc>>, + ) -> InjectReusedChunksQueue { + InjectReusedChunksQueue { + input: self, + current: None, + injection_queue, + buffer: None, + stream_len, + reused_len, + index_csum, + } + } +} + +impl Stream for InjectReusedChunksQueue +where + S: Stream>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + loop { + let current = this.current.take(); + if let Some(current) = current { + let mut chunks = Vec::new(); + let mut guard = this.index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); + + for chunk in current.chunks { + let offset = this + .stream_len + .fetch_add(chunk.end() as usize, Ordering::SeqCst) + as u64; + this.reused_len + .fetch_add(chunk.end() as usize, Ordering::SeqCst); + let digest = chunk.digest(); + chunks.push((offset, digest)); + // Chunk end is assumed to be normalized to chunk size here + let end_offset = offset + chunk.end(); + csum.update(&end_offset.to_le_bytes()); + csum.update(&digest); + } + let chunk_info = InjectedChunksInfo::Known(chunks); + return Poll::Ready(Some(Ok(chunk_info))); + } + + let buffer = this.buffer.take(); + if let Some(buffer) = buffer { + let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64; + let data = InjectedChunksInfo::Raw((offset, buffer)); + return Poll::Ready(Some(Ok(data))); + } + + match ready!(this.input.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(Err(err)) => return Poll::Ready(Some(Err(err))), + Some(Ok(raw)) => { + let chunk_size = raw.len(); + let offset = this.stream_len.load(Ordering::SeqCst) as u64; + let mut injections = this.injection_queue.lock().unwrap(); + + if let Some(inject) = injections.pop_front() { + if inject.boundary == offset { + if this.current.replace(inject).is_some() { + return Poll::Ready(Some(Err(anyhow!( + "replaced injection queue not empty" + )))); + } + if chunk_size > 0 && this.buffer.replace(raw).is_some() { + return Poll::Ready(Some(Err(anyhow!( + "replaced buffer not empty" + )))); + } + continue; + } else if inject.boundary == offset + chunk_size as u64 { + let _ = this.current.insert(inject); + } else if inject.boundary < offset + chunk_size as u64 { + return Poll::Ready(Some(Err(anyhow!("invalid injection boundary")))); + } else { + injections.push_front(inject); + } + } + + if chunk_size == 0 { + return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data")))); + } + + let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64; + let data = InjectedChunksInfo::Raw((offset, raw)); + + return Poll::Ready(Some(Ok(data))); + } + } + } + } +} diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs index 21cf8556..8bf26381 100644 --- a/pbs-client/src/lib.rs +++ b/pbs-client/src/lib.rs @@ -8,6 +8,7 @@ pub mod pxar; pub mod tools; mod merge_known_chunks; +mod inject_reused_chunks; pub mod pipe_to_stream; mod http_client; -- 2.39.2