From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 6936F1FF389
	for <inbox@lore.proxmox.com>; Tue,  7 May 2024 17:54:26 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 14D9112950;
	Tue,  7 May 2024 17:54:21 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Tue,  7 May 2024 17:52:21 +0200
Message-Id: <20240507155244.793819-40-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240507155244.793819-1-c.ebner@proxmox.com>
References: <20240507155244.793819-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.027 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v5 proxmox-backup 39/62] upload stream:
 implement reused chunk injector
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream at a
forced boundary. The chunker forces a chunk boundary and sends the
list of reusable dynamic entries to be uploaded.

This implements the logic to receive these dynamic entries via the
corresponding communication channel from the chunker and inject the
entries into the backup upload stream by looking for the matching
chunk boundary, already forced by the chunker.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 4:
- no changes

 pbs-client/src/inject_reused_chunks.rs | 129 +++++++++++++++++++++++++
 pbs-client/src/lib.rs                  |   1 +
 2 files changed, 130 insertions(+)
 create mode 100644 pbs-client/src/inject_reused_chunks.rs

diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 000000000..ed147f5fb
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,129 @@
+use std::cmp;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{mpsc, Arc};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use crate::pxar::create::ReusableDynamicEntry;
+
+pin_project! {
+    pub struct InjectReusedChunksQueue<S> {
+        #[pin]
+        input: S,
+        next_injection: Option<InjectChunks>,
+        buffer: Option<bytes::BytesMut>,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    }
+}
+
+type StreamOffset = u64;
+#[derive(Debug)]
+/// Holds a list of chunks to inject at the given boundary by forcing a chunk boundary.
+pub struct InjectChunks {
+    /// Offset at which to force the boundary
+    pub boundary: StreamOffset,
+    /// List of chunks to inject
+    pub chunks: Vec<ReusableDynamicEntry>,
+    /// Cumulative size of the chunks in the list
+    pub size: usize,
+}
+
+/// Variants for stream consumer to distinguish between raw data chunks and injected ones.
+pub enum InjectedChunksInfo {
+    Known(Vec<ReusableDynamicEntry>),
+    Raw(bytes::BytesMut),
+}
+
+pub trait InjectReusedChunks: Sized {
+    fn inject_reused_chunks(
+        self,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    fn inject_reused_chunks(
+        self,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    ) -> InjectReusedChunksQueue<Self> {
+        InjectReusedChunksQueue {
+            input: self,
+            next_injection: None,
+            injections,
+            buffer: None,
+            stream_len,
+        }
+    }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    type Item = Result<InjectedChunksInfo, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+
+        // loop to skip over possible empty chunks
+        loop {
+            if this.next_injection.is_none() {
+                if let Some(injections) = this.injections.as_mut() {
+                    if let Ok(injection) = injections.try_recv() {
+                        *this.next_injection = Some(injection);
+                    }
+                }
+            }
+
+            if let Some(inject) = this.next_injection.take() {
+                // got reusable dynamic entries to inject
+                let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+
+                match inject.boundary.cmp(&offset) {
+                    // inject now
+                    cmp::Ordering::Equal => {
+                        let chunk_info = InjectedChunksInfo::Known(inject.chunks);
+                        return Poll::Ready(Some(Ok(chunk_info)));
+                    }
+                    // inject later
+                    cmp::Ordering::Greater => *this.next_injection = Some(inject),
+                    // incoming new chunks and injections didn't line up?
+                    cmp::Ordering::Less => {
+                        return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))))
+                    }
+                }
+            }
+
+            // nothing to inject now, await further input
+            match ready!(this.input.as_mut().poll_next(cx)) {
+                None => {
+                    if let Some(injections) = this.injections.as_mut() {
+                        if this.next_injection.is_some() || injections.try_recv().is_ok() {
+                            // stream finished, but remaining dynamic entries to inject
+                            return Poll::Ready(Some(Err(anyhow!(
+                                "injection queue not fully consumed"
+                            ))));
+                        }
+                    }
+                    // stream finished and all dynamic entries already injected
+                    return Poll::Ready(None);
+                }
+                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+                // ignore empty chunks, injected chunks from queue at forced boundary, but boundary
+                // did not require splitting of the raw stream buffer to force the boundary
+                Some(Ok(raw)) if raw.is_empty() => continue,
+                Some(Ok(raw)) => return Poll::Ready(Some(Ok(InjectedChunksInfo::Raw(raw)))),
+            }
+        }
+    }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556b..3e7bd2a8b 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
 pub mod pxar;
 pub mod tools;
 
+mod inject_reused_chunks;
 mod merge_known_chunks;
 pub mod pipe_to_stream;
 
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel