public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 25/36] upload stream: impl reused chunk injector
Date: Wed, 28 Feb 2024 15:02:15 +0100	[thread overview]
Message-ID: <20240228140226.1251979-26-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com>

In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream.

The chunker forces a chunk boundary and queues the list of chunks to
be uploaded thereafter.

This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
 pbs-client/src/lib.rs                  |   1 +
 2 files changed, 153 insertions(+)
 create mode 100644 pbs-client/src/inject_reused_chunks.rs

diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 00000000..7c0f7780
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,152 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use pbs_datastore::dynamic_index::AppendableDynamicEntry;
+
+pin_project! {
+    pub struct InjectReusedChunksQueue<S> {
+        #[pin]
+        input: S,
+        current: Option<InjectChunks>,
+        buffer: Option<bytes::BytesMut>,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+    pub boundary: u64,
+    pub chunks: Vec<AppendableDynamicEntry>,
+    pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+    Known(Vec<(u64, [u8; 32])>),
+    Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self> {
+        InjectReusedChunksQueue {
+            input: self,
+            current: None,
+            injection_queue,
+            buffer: None,
+            stream_len,
+            reused_len,
+            index_csum,
+        }
+    }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    type Item = Result<InjectedChunksInfo, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        loop {
+            let current = this.current.take();
+            if let Some(current) = current {
+                let mut chunks = Vec::new();
+                let mut guard = this.index_csum.lock().unwrap();
+                let csum = guard.as_mut().unwrap();
+
+                for chunk in current.chunks {
+                    let offset = this
+                        .stream_len
+                        .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+                        as u64;
+                    this.reused_len
+                        .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+                    let digest = chunk.digest();
+                    chunks.push((offset, digest));
+                    let end_offset = offset + chunk.size();
+                    csum.update(&end_offset.to_le_bytes());
+                    csum.update(&digest);
+                }
+                let chunk_info = InjectedChunksInfo::Known(chunks);
+                return Poll::Ready(Some(Ok(chunk_info)));
+            }
+
+            let buffer = this.buffer.take();
+            if let Some(buffer) = buffer {
+                let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+                let data = InjectedChunksInfo::Raw((offset, buffer));
+                return Poll::Ready(Some(Ok(data)));
+            }
+
+            match ready!(this.input.as_mut().poll_next(cx)) {
+                None => return Poll::Ready(None),
+                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+                Some(Ok(raw)) => {
+                    let chunk_size = raw.len();
+                    let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+                    let mut injections = this.injection_queue.lock().unwrap();
+
+                    if let Some(inject) = injections.pop_front() {
+                        if inject.boundary == offset {
+                            if this.current.replace(inject).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced injection queue not empty"
+                                ))));
+                            }
+                            if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced buffer not empty"
+                                ))));
+                            }
+                            continue;
+                        } else if inject.boundary == offset + chunk_size as u64 {
+                            let _ = this.current.insert(inject);
+                        } else if inject.boundary < offset + chunk_size as u64 {
+                            return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+                        } else {
+                            injections.push_front(inject);
+                        }
+                    }
+
+                    if chunk_size == 0 {
+                        return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+                    }
+
+                    let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+                    let data = InjectedChunksInfo::Raw((offset, raw));
+
+                    return Poll::Ready(Some(Ok(data)));
+                }
+            }
+        }
+    }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556..3e7bd2a8 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
 pub mod pxar;
 pub mod tools;
 
+mod inject_reused_chunks;
 mod merge_known_chunks;
 pub mod pipe_to_stream;
 
-- 
2.39.2





  parent reply	other threads:[~2024-02-28 14:09 UTC|newest]

Thread overview: 39+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-02-28 14:01 [pbs-devel] [RFC pxar proxmox-backup 00/36] fix #3174: improve file-level backup Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 01/36] format/examples: Fix typo in PXAR_PAYLOAD description Christian Ebner
2024-02-28 18:09   ` [pbs-devel] applied: " Thomas Lamprecht
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 02/36] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 03/36] encoder: add optional output writer for file payloads Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 04/36] decoder: add optional payload input stream Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 05/36] accessor: " Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 06/36] encoder: move to stack based state tracking Christian Ebner
2024-03-12 10:12   ` Dietmar Maurer
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 07/36] encoder: add payload reference capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 08/36] encoder: add payload position capability Christian Ebner
2024-02-28 14:01 ` [pbs-devel] [RFC pxar 09/36] encoder: add payload advance capabilty Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC pxar 10/36] encoder/format: finish payload stream with marker Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 11/36] client: pxar: switch to stack based encoder state Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 12/36] client: backup: factor out extension from backup target Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 13/36] client: backup: early check for fixed index type Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 14/36] client: backup: split payload to dedicated stream Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 15/36] client: restore: read payload from dedicated index Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 16/36] tools: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 17/36] restore: " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 18/36] client: mount: make split pxar archives mountable Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 19/36] api: datastore: refactor getting local chunk reader Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 20/36] api: datastore: attach optional payload " Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 21/36] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 22/36] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 23/36] www: cover meta extension for pxar archives Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 24/36] index: fetch chunk form index by start/end-offset Christian Ebner
2024-02-28 14:02 ` Christian Ebner [this message]
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 27/36] client: implement prepare reference method Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 28/36] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 29/36] client: pxar: add previous reference to archiver Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 30/36] client: pxar: add method for metadata comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 31/36] specs: add backup detection mode specification Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 32/36] pxar: caching: add look-ahead cache types Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 33/36] client: pxar: add look-ahead caching Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 34/36] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 35/36] test-suite: add detection mode change benchmark Christian Ebner
2024-02-28 14:02 ` [pbs-devel] [RFC proxmox-backup 36/36] test-suite: Add bin to deb, add shell completions Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240228140226.1251979-26-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal