public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v7 proxmox-backup 41/69] upload stream: implement reused chunk injector
Date: Mon, 27 May 2024 16:32:55 +0200	[thread overview]
Message-ID: <20240527143323.456002-42-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240527143323.456002-1-c.ebner@proxmox.com>

In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream at a
forced boundary. The chunker forces a chunk boundary and sends the
list of reusable dynamic entries to be uploaded.

This implements the logic to receive these dynamic entries via the
corresponding communication channel from the chunker and inject the
entries into the backup upload stream by looking for the matching
chunk boundary, already forced by the chunker.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 6:
- no changes

 pbs-client/src/inject_reused_chunks.rs | 129 +++++++++++++++++++++++++
 pbs-client/src/lib.rs                  |   1 +
 2 files changed, 130 insertions(+)
 create mode 100644 pbs-client/src/inject_reused_chunks.rs

diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 000000000..ed147f5fb
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,129 @@
+use std::cmp;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{mpsc, Arc};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use crate::pxar::create::ReusableDynamicEntry;
+
+pin_project! {
+    pub struct InjectReusedChunksQueue<S> {
+        #[pin]
+        input: S,
+        next_injection: Option<InjectChunks>,
+        buffer: Option<bytes::BytesMut>,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    }
+}
+
+type StreamOffset = u64;
+#[derive(Debug)]
+/// Holds a list of chunks to inject at the given boundary by forcing a chunk boundary.
+pub struct InjectChunks {
+    /// Offset at which to force the boundary
+    pub boundary: StreamOffset,
+    /// List of chunks to inject
+    pub chunks: Vec<ReusableDynamicEntry>,
+    /// Cumulative size of the chunks in the list
+    pub size: usize,
+}
+
+/// Variants for stream consumer to distinguish between raw data chunks and injected ones.
+pub enum InjectedChunksInfo {
+    Known(Vec<ReusableDynamicEntry>),
+    Raw(bytes::BytesMut),
+}
+
+pub trait InjectReusedChunks: Sized {
+    fn inject_reused_chunks(
+        self,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    fn inject_reused_chunks(
+        self,
+        injections: Option<mpsc::Receiver<InjectChunks>>,
+        stream_len: Arc<AtomicUsize>,
+    ) -> InjectReusedChunksQueue<Self> {
+        InjectReusedChunksQueue {
+            input: self,
+            next_injection: None,
+            injections,
+            buffer: None,
+            stream_len,
+        }
+    }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    type Item = Result<InjectedChunksInfo, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+
+        // loop to skip over possible empty chunks
+        loop {
+            if this.next_injection.is_none() {
+                if let Some(injections) = this.injections.as_mut() {
+                    if let Ok(injection) = injections.try_recv() {
+                        *this.next_injection = Some(injection);
+                    }
+                }
+            }
+
+            if let Some(inject) = this.next_injection.take() {
+                // got reusable dynamic entries to inject
+                let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+
+                match inject.boundary.cmp(&offset) {
+                    // inject now
+                    cmp::Ordering::Equal => {
+                        let chunk_info = InjectedChunksInfo::Known(inject.chunks);
+                        return Poll::Ready(Some(Ok(chunk_info)));
+                    }
+                    // inject later
+                    cmp::Ordering::Greater => *this.next_injection = Some(inject),
+                    // incoming new chunks and injections didn't line up?
+                    cmp::Ordering::Less => {
+                        return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))))
+                    }
+                }
+            }
+
+            // nothing to inject now, await further input
+            match ready!(this.input.as_mut().poll_next(cx)) {
+                None => {
+                    if let Some(injections) = this.injections.as_mut() {
+                        if this.next_injection.is_some() || injections.try_recv().is_ok() {
+                            // stream finished, but remaining dynamic entries to inject
+                            return Poll::Ready(Some(Err(anyhow!(
+                                "injection queue not fully consumed"
+                            ))));
+                        }
+                    }
+                    // stream finished and all dynamic entries already injected
+                    return Poll::Ready(None);
+                }
+                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+                // ignore empty chunks, injected chunks from queue at forced boundary, but boundary
+                // did not require splitting of the raw stream buffer to force the boundary
+                Some(Ok(raw)) if raw.is_empty() => continue,
+                Some(Ok(raw)) => return Poll::Ready(Some(Ok(InjectedChunksInfo::Raw(raw)))),
+            }
+        }
+    }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556b..3e7bd2a8b 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
 pub mod pxar;
 pub mod tools;
 
+mod inject_reused_chunks;
 mod merge_known_chunks;
 pub mod pipe_to_stream;
 
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-05-27 14:33 UTC|newest]

Thread overview: 71+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-05-27 14:32 [pbs-devel] [PATCH v7 pxar proxmox-backup 00/69] fix #3174: improve file-level backup Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 01/69] decoder: factor out skip part from skip_entry Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 02/69] lib: add type for input/output variant differentiation Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 03/69] encoder: move to stack based state tracking Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 04/69] format/examples: add header type `PXAR_PAYLOAD_REF` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 05/69] decoder: add method to read payload references Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 06/69] encoder: allow split output writer for archive creation Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 07/69] decoder/accessor: allow for split input stream variant Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 08/69] decoder: set payload input range when decoding via accessor Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 09/69] encoder: add payload reference capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 10/69] encoder: add payload position capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 11/69] encoder: add payload advance capability Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 12/69] encoder/format: finish payload stream with marker Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 13/69] format: add payload stream start marker Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 14/69] format/encoder/decoder: new pxar entry type `Version` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 pxar 15/69] format/encoder/decoder: new pxar entry type `Prelude` Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 16/69] client: backup: factor out extension from backup target Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 17/69] api: datastore: refactor getting local chunk reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 18/69] client: pxar: switch to stack based encoder state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 19/69] client: pxar: combine writers into struct Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 20/69] client: pxar: optionally split metadata and payload streams Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 21/69] client: helper: add helpers for creating reader instances Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 22/69] client: helper: add method for split archive name mapping Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 23/69] client: tools: helper to check pxar filename extensions Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 24/69] client: restore: read payload from dedicated index Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 25/69] tools: cover extension for split pxar archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 26/69] restore: " Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 27/69] client: mount: make split pxar archives mountable Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 28/69] api: datastore: attach split archive payload chunk reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 29/69] catalog: shell: make split pxar archives accessible Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 30/69] www: cover metadata extension for pxar archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 31/69] file restore: factor out getting pxar reader Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 32/69] file restore: cover split metadata and payload archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 33/69] file restore: show more error context when extraction fails Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 34/69] pxar: add optional payload input for archive restore Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 35/69] pxar: cover listing for split archives Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 36/69] pxar: add more context to extraction error Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 37/69] client: pxar: include payload offset in entry listing Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 38/69] pxar: show padding in debug output on archive list Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 39/69] datastore: dynamic index: add method to get digest Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 40/69] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-05-27 14:32 ` Christian Ebner [this message]
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 42/69] client: chunk stream: add struct to hold injection state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 43/69] chunker: add method to reset chunker state Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 44/69] client: streams: add channels for dynamic entry injection Christian Ebner
2024-05-27 14:32 ` [pbs-devel] [PATCH v7 proxmox-backup 45/69] specs: add backup detection mode specification Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 46/69] client: implement prepare reference method Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 47/69] client: pxar: add method for metadata comparison Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 48/69] pxar: caching: add look-ahead cache Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 49/69] client: pxar: refactor catalog encoding for directories Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 50/69] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 51/69] client: backup writer: add injected chunk count to stats Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 52/69] pxar: create: keep track of reused chunks and files Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 53/69] pxar: create: show chunk injection stats debug output Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 54/69] client: pxar: add helper to handle optional preludes Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 55/69] client: pxar: opt encode cli exclude patterns as Prelude Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 56/69] pxar: ignore version and prelude entries in listing Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 57/69] docs: file formats: describe split pxar archive file layout Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 58/69] docs: add section describing change detection mode Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 59/69] test-suite: add detection mode change benchmark Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 60/69] test-suite: Makefile: add debian package and related files Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 61/69] datastore: chunker: add Chunker trait Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 62/69] datastore: chunker: implement chunker for payload stream Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 63/69] client: chunk stream: switch payload stream chunker Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 64/69] client: pxar: allow to restore prelude to optional path Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 65/69] client: pxar: add archive creation with reference test Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 66/69] client: tools: add helper to raise nofile rlimit Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 67/69] client: pxar: set cache limit based on " Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 68/69] chunker: tests: add regression tests for payload chunker Christian Ebner
2024-05-27 14:33 ` [pbs-devel] [PATCH v7 proxmox-backup 69/69] chunk stream: " Christian Ebner
2024-05-28  9:45 ` [pbs-devel] [PATCH v7 pxar proxmox-backup 00/69] fix #3174: improve file-level backup Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240527143323.456002-42-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal