public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v4 proxmox-backup 16/26] fix #3174: upload stream: impl reused chunk injector
Date: Thu,  9 Nov 2023 19:46:04 +0100	[thread overview]
Message-ID: <20231109184614.1611127-17-c.ebner@proxmox.com> (raw)
In-Reply-To: <20231109184614.1611127-1-c.ebner@proxmox.com>

In order to be included in the backups index file, the reused chunks
which store the payload of skipped files during pxar encoding have to be
inserted after the encoder has written the pxar appendix entry type.

The chunker forces a chunk boundary after this marker and queues the
list of chunks to be uploaded thereafter.
This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Changes since version 3:
- count appendix chunks as reused chunks
- fix issue with stream being corrupted by missing buffering of data
  when injecting

Changes since version 2:
- no changes

Changes since version 1:
- no changes

 pbs-client/src/inject_reused_chunks.rs | 155 +++++++++++++++++++++++++
 pbs-client/src/lib.rs                  |   1 +
 2 files changed, 156 insertions(+)
 create mode 100644 pbs-client/src/inject_reused_chunks.rs

diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 00000000..87176a15
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,155 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use pbs_datastore::dynamic_index::DynamicEntry;
+
+pin_project! {
+    pub struct InjectReusedChunksQueue<S> {
+        #[pin]
+        input: S,
+        current: Option<InjectChunks>,
+        buffer: Option<bytes::BytesMut>,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+    pub boundary: u64,
+    pub chunks: Vec<DynamicEntry>,
+    pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+    Known(Vec<(u64, [u8; 32])>),
+    Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    fn inject_reused_chunks(
+        self,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+        stream_len: Arc<AtomicUsize>,
+        reused_len: Arc<AtomicUsize>,
+        index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+    ) -> InjectReusedChunksQueue<Self> {
+        let current = injection_queue.lock().unwrap().pop_front();
+
+        InjectReusedChunksQueue {
+            input: self,
+            current,
+            injection_queue,
+            buffer: None,
+            stream_len,
+            reused_len,
+            index_csum,
+        }
+    }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+    S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+    type Item = Result<InjectedChunksInfo, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        loop {
+            let current = this.current.take();
+            if let Some(current) = current {
+                let mut chunks = Vec::new();
+                let mut guard = this.index_csum.lock().unwrap();
+                let csum = guard.as_mut().unwrap();
+
+                for chunk in current.chunks {
+                    let offset = this
+                        .stream_len
+                        .fetch_add(chunk.end() as usize, Ordering::SeqCst)
+                        as u64;
+                    this.reused_len
+                        .fetch_add(chunk.end() as usize, Ordering::SeqCst);
+                    let digest = chunk.digest();
+                    chunks.push((offset, digest));
+                    // Chunk end is assumed to be normalized to chunk size here
+                    let end_offset = offset + chunk.end();
+                    csum.update(&end_offset.to_le_bytes());
+                    csum.update(&digest);
+                }
+                let chunk_info = InjectedChunksInfo::Known(chunks);
+                return Poll::Ready(Some(Ok(chunk_info)));
+            }
+
+            let buffer = this.buffer.take();
+            if let Some(buffer) = buffer {
+                let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+                let data = InjectedChunksInfo::Raw((offset, buffer));
+                return Poll::Ready(Some(Ok(data)));
+            }
+
+            match ready!(this.input.as_mut().poll_next(cx)) {
+                None => return Poll::Ready(None),
+                Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+                Some(Ok(raw)) => {
+                    let chunk_size = raw.len();
+                    let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+                    let mut injections = this.injection_queue.lock().unwrap();
+
+                    if let Some(inject) = injections.pop_front() {
+                        if inject.boundary == offset {
+                            if this.current.replace(inject).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced injection queue not empty"
+                                ))));
+                            }
+                            if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+                                return Poll::Ready(Some(Err(anyhow!(
+                                    "replaced buffer not empty"
+                                ))));
+                            }
+                            continue;
+                        } else if inject.boundary == offset + chunk_size as u64 {
+                            let _ = this.current.insert(inject);
+                        } else if inject.boundary < offset + chunk_size as u64 {
+                            return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+                        } else {
+                            injections.push_front(inject);
+                        }
+                    }
+
+                    if chunk_size == 0 {
+                        return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+                    }
+
+                    let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+                    let data = InjectedChunksInfo::Raw((offset, raw));
+
+                    return Poll::Ready(Some(Ok(data)));
+                }
+            }
+        }
+    }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556..8bf26381 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -8,6 +8,7 @@ pub mod pxar;
 pub mod tools;
 
 mod merge_known_chunks;
+mod inject_reused_chunks;
 pub mod pipe_to_stream;
 
 mod http_client;
-- 
2.39.2





  parent reply	other threads:[~2023-11-09 18:47 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-11-09 18:45 [pbs-devel] [PATCH-SERIES v4 pxar proxmox-backup proxmox-widget-toolkit 00/26] fix #3174: improve file-level backup Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 1/26] fix #3174: decoder: factor out skip_bytes from skip_entry Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 2/26] fix #3174: decoder: impl skip_bytes for sync dec Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 3/26] fix #3174: encoder: calc filename + metadata byte size Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 4/26] fix #3174: enc/dec: impl PXAR_APPENDIX_REF entrytype Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 5/26] fix #3174: enc/dec: impl PXAR_APPENDIX entrytype Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 6/26] fix #3174: encoder: helper to add to encoder position Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 pxar 7/26] fix #3174: enc/dec: impl PXAR_APPENDIX_TAIL entrytype Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 proxmox-backup 08/26] fix #3174: index: add fn index list from start/end-offsets Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 proxmox-backup 09/26] fix #3174: index: add fn digest for DynamicEntry Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 proxmox-backup 10/26] fix #3174: api: double catalog upload size Christian Ebner
2023-11-09 18:45 ` [pbs-devel] [PATCH v4 proxmox-backup 11/26] fix #3174: catalog: introduce extended format v2 Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 12/26] fix #3174: archiver/extractor: impl appendix ref Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 13/26] fix #3174: catalog: add specialized Archive entry Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 14/26] fix #3174: extractor: impl seq restore from appendix Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 15/26] fix #3174: archiver: store ref to previous backup Christian Ebner
2023-11-09 18:46 ` Christian Ebner [this message]
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 17/26] fix #3174: chunker: add forced boundaries Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 18/26] fix #3174: backup writer: inject queued chunk in upload steam Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 19/26] fix #3174: archiver: reuse files with unchanged metadata Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 20/26] fix #3174: specs: add backup detection mode specification Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 21/26] fix #3174: client: Add detection mode to backup creation Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 22/26] test-suite: add detection mode change benchmark Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 23/26] test-suite: Add bin to deb, add shell completions Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 24/26] catalog: fetch offset and size for files and refs Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-backup 25/26] pxar: add heuristic to reduce reused chunk fragmentation Christian Ebner
2023-11-09 18:46 ` [pbs-devel] [PATCH v4 proxmox-widget-toolkit 26/26] file-browser: support pxar archive and fileref types Christian Ebner
2023-11-13 14:23 ` [pbs-devel] [PATCH-SERIES v4 pxar proxmox-backup proxmox-widget-toolkit 00/26] fix #3174: improve file-level backup Fabian Grünbichler
2023-11-13 15:14   ` Christian Ebner
2023-11-13 15:21     ` Christian Ebner
2023-11-13 15:35     ` Fabian Grünbichler
2023-11-13 15:45       ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231109184614.1611127-17-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal