From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v3 proxmox-backup 38/58] upload stream: impl reused chunk injector
Date: Thu, 28 Mar 2024 13:36:47 +0100 [thread overview]
Message-ID: <20240328123707.336951-39-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240328123707.336951-1-c.ebner@proxmox.com>
In order to be included in the backups index file, reused payload
chunks have to be injected into the payload upload stream.
The chunker forces a chunk boundary and queues the list of chunks to
be uploaded thereafter.
This implements the logic to inject the chunks into the chunk upload
stream after such a boundary is requested, by looping over the queued
chunks and inserting them into the stream.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- no changes
pbs-client/src/inject_reused_chunks.rs | 152 +++++++++++++++++++++++++
pbs-client/src/lib.rs | 1 +
2 files changed, 153 insertions(+)
create mode 100644 pbs-client/src/inject_reused_chunks.rs
diff --git a/pbs-client/src/inject_reused_chunks.rs b/pbs-client/src/inject_reused_chunks.rs
new file mode 100644
index 000000000..5cc19ce5d
--- /dev/null
+++ b/pbs-client/src/inject_reused_chunks.rs
@@ -0,0 +1,152 @@
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll};
+
+use anyhow::{anyhow, Error};
+use futures::{ready, Stream};
+use pin_project_lite::pin_project;
+
+use crate::pxar::create::ReusableDynamicEntry;
+
+pin_project! {
+ pub struct InjectReusedChunksQueue<S> {
+ #[pin]
+ input: S,
+ current: Option<InjectChunks>,
+ buffer: Option<bytes::BytesMut>,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ }
+}
+
+#[derive(Debug)]
+pub struct InjectChunks {
+ pub boundary: u64,
+ pub chunks: Vec<ReusableDynamicEntry>,
+ pub size: usize,
+}
+
+pub enum InjectedChunksInfo {
+ Known(Vec<(u64, [u8; 32])>),
+ Raw((u64, bytes::BytesMut)),
+}
+
+pub trait InjectReusedChunks: Sized {
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self>;
+}
+
+impl<S> InjectReusedChunks for S
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ fn inject_reused_chunks(
+ self,
+ injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
+ stream_len: Arc<AtomicUsize>,
+ reused_len: Arc<AtomicUsize>,
+ index_csum: Arc<Mutex<Option<openssl::sha::Sha256>>>,
+ ) -> InjectReusedChunksQueue<Self> {
+ InjectReusedChunksQueue {
+ input: self,
+ current: None,
+ injection_queue,
+ buffer: None,
+ stream_len,
+ reused_len,
+ index_csum,
+ }
+ }
+}
+
+impl<S> Stream for InjectReusedChunksQueue<S>
+where
+ S: Stream<Item = Result<bytes::BytesMut, Error>>,
+{
+ type Item = Result<InjectedChunksInfo, Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ loop {
+ let current = this.current.take();
+ if let Some(current) = current {
+ let mut chunks = Vec::new();
+ let mut guard = this.index_csum.lock().unwrap();
+ let csum = guard.as_mut().unwrap();
+
+ for chunk in current.chunks {
+ let offset = this
+ .stream_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst)
+ as u64;
+ this.reused_len
+ .fetch_add(chunk.size() as usize, Ordering::SeqCst);
+ let digest = chunk.digest();
+ chunks.push((offset, digest));
+ let end_offset = offset + chunk.size();
+ csum.update(&end_offset.to_le_bytes());
+ csum.update(&digest);
+ }
+ let chunk_info = InjectedChunksInfo::Known(chunks);
+ return Poll::Ready(Some(Ok(chunk_info)));
+ }
+
+ let buffer = this.buffer.take();
+ if let Some(buffer) = buffer {
+ let offset = this.stream_len.fetch_add(buffer.len(), Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, buffer));
+ return Poll::Ready(Some(Ok(data)));
+ }
+
+ match ready!(this.input.as_mut().poll_next(cx)) {
+ None => return Poll::Ready(None),
+ Some(Err(err)) => return Poll::Ready(Some(Err(err))),
+ Some(Ok(raw)) => {
+ let chunk_size = raw.len();
+ let offset = this.stream_len.load(Ordering::SeqCst) as u64;
+ let mut injections = this.injection_queue.lock().unwrap();
+
+ if let Some(inject) = injections.pop_front() {
+ if inject.boundary == offset {
+ if this.current.replace(inject).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced injection queue not empty"
+ ))));
+ }
+ if chunk_size > 0 && this.buffer.replace(raw).is_some() {
+ return Poll::Ready(Some(Err(anyhow!(
+ "replaced buffer not empty"
+ ))));
+ }
+ continue;
+ } else if inject.boundary == offset + chunk_size as u64 {
+ let _ = this.current.insert(inject);
+ } else if inject.boundary < offset + chunk_size as u64 {
+ return Poll::Ready(Some(Err(anyhow!("invalid injection boundary"))));
+ } else {
+ injections.push_front(inject);
+ }
+ }
+
+ if chunk_size == 0 {
+ return Poll::Ready(Some(Err(anyhow!("unexpected empty raw data"))));
+ }
+
+ let offset = this.stream_len.fetch_add(chunk_size, Ordering::SeqCst) as u64;
+ let data = InjectedChunksInfo::Raw((offset, raw));
+
+ return Poll::Ready(Some(Ok(data)));
+ }
+ }
+ }
+ }
+}
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index 21cf8556b..3e7bd2a8b 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -7,6 +7,7 @@ pub mod catalog_shell;
pub mod pxar;
pub mod tools;
+mod inject_reused_chunks;
mod merge_known_chunks;
pub mod pipe_to_stream;
--
2.39.2
next prev parent reply other threads:[~2024-03-28 12:38 UTC|newest]
Thread overview: 122+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-03-28 12:36 [pbs-devel] [PATCH v3 pxar proxmox-backup 00/58] fix #3174: improve file-level backup Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 01/58] encoder: fix two typos in comments Christian Ebner
2024-04-03 9:12 ` [pbs-devel] applied: " Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 02/58] format/examples: add PXAR_PAYLOAD_REF entry header Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 03/58] decoder: add method to read payload references Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 04/58] decoder: factor out skip part from skip_entry Christian Ebner
2024-04-03 9:18 ` Fabian Grünbichler
2024-04-03 11:02 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 05/58] encoder: add optional output writer for file payloads Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 06/58] encoder: move to stack based state tracking Christian Ebner
2024-04-03 9:54 ` Fabian Grünbichler
2024-04-03 11:01 ` Christian Ebner
2024-04-04 8:48 ` Fabian Grünbichler
2024-04-04 9:04 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 07/58] decoder/accessor: add optional payload input stream Christian Ebner
2024-04-03 10:38 ` Fabian Grünbichler
2024-04-03 11:47 ` Christian Ebner
2024-04-03 12:18 ` Christian Ebner
2024-04-04 8:46 ` Fabian Grünbichler
2024-04-04 9:49 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 08/58] encoder: add payload reference capability Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 09/58] encoder: add payload position capability Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 10/58] encoder: add payload advance capability Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 11/58] encoder/format: finish payload stream with marker Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 12/58] format: add payload stream start marker Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 13/58] format: add pxar format version entry Christian Ebner
2024-04-03 11:41 ` Fabian Grünbichler
2024-04-03 13:31 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 pxar 14/58] format/encoder/decoder: add entry type cli params Christian Ebner
2024-04-03 12:01 ` Fabian Grünbichler
2024-04-03 14:41 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 15/58] client: pxar: switch to stack based encoder state Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 16/58] client: backup writer: only borrow http client Christian Ebner
2024-04-08 9:04 ` [pbs-devel] applied: " Fabian Grünbichler
2024-04-08 9:17 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 17/58] client: backup: factor out extension from backup target Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 18/58] client: backup: early check for fixed index type Christian Ebner
2024-04-08 9:05 ` [pbs-devel] applied: " Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 19/58] client: pxar: combine writer params into struct Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 20/58] client: backup: split payload to dedicated stream Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 21/58] client: helper: add helpers for creating reader instances Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 22/58] client: helper: add method for split archive name mapping Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 23/58] client: restore: read payload from dedicated index Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 24/58] tools: cover meta extension for pxar archives Christian Ebner
2024-04-04 9:01 ` Fabian Grünbichler
2024-04-04 9:06 ` Christian Ebner
2024-04-04 9:10 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 25/58] restore: " Christian Ebner
2024-04-04 9:02 ` Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 26/58] client: mount: make split pxar archives mountable Christian Ebner
2024-04-04 9:43 ` Fabian Grünbichler
2024-04-04 13:29 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 27/58] api: datastore: refactor getting local chunk reader Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 28/58] api: datastore: attach optional payload " Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 29/58] catalog: shell: factor out pxar fuse reader instantiation Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 30/58] catalog: shell: redirect payload reader for split streams Christian Ebner
2024-04-04 9:49 ` Fabian Grünbichler
2024-04-04 15:52 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 31/58] www: cover meta extension for pxar archives Christian Ebner
2024-04-04 10:01 ` Fabian Grünbichler
2024-04-04 14:51 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 32/58] pxar: add optional payload input for achive restore Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 33/58] pxar: add more context to extraction error Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 34/58] client: pxar: include payload offset in output Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 35/58] pxar: show padding in debug output on archive list Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 36/58] datastore: dynamic index: add method to get digest Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 37/58] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-04-04 12:54 ` Fabian Grünbichler
2024-04-04 17:13 ` Christian Ebner
2024-04-05 7:22 ` Christian Ebner
2024-04-05 11:28 ` Fabian Grünbichler
2024-03-28 12:36 ` Christian Ebner [this message]
2024-04-04 14:24 ` [pbs-devel] [PATCH v3 proxmox-backup 38/58] upload stream: impl reused chunk injector Fabian Grünbichler
2024-04-05 10:26 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 39/58] client: chunk stream: add struct to hold injection state Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 40/58] client: chunk stream: add dynamic entries injection queues Christian Ebner
2024-04-04 14:52 ` Fabian Grünbichler
2024-04-08 13:54 ` Christian Ebner
2024-04-09 7:19 ` Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 41/58] specs: add backup detection mode specification Christian Ebner
2024-04-04 14:54 ` Fabian Grünbichler
2024-04-08 13:36 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 42/58] client: implement prepare reference method Christian Ebner
2024-04-05 8:01 ` Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 43/58] client: pxar: implement store to insert chunks on caching Christian Ebner
2024-04-05 7:52 ` Fabian Grünbichler
2024-04-09 9:12 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 44/58] client: pxar: add previous reference to archiver Christian Ebner
2024-04-04 15:04 ` Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 45/58] client: pxar: add method for metadata comparison Christian Ebner
2024-04-05 8:08 ` Fabian Grünbichler
2024-04-05 8:14 ` Christian Ebner
2024-04-09 12:52 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 46/58] pxar: caching: add look-ahead cache types Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 47/58] client: pxar: add look-ahead caching Christian Ebner
2024-04-05 8:33 ` Fabian Grünbichler
2024-04-09 14:53 ` Christian Ebner
[not found] ` <<dce38c53-f3e7-47ac-b1fd-a63daaabbcec@proxmox.com>
2024-04-10 7:03 ` Fabian Grünbichler
2024-04-10 7:11 ` Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 48/58] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 49/58] client: backup: increase average chunk size for metadata Christian Ebner
2024-04-05 9:42 ` Fabian Grünbichler
2024-04-05 10:49 ` Dietmar Maurer
2024-04-08 8:28 ` Fabian Grünbichler
2024-03-28 12:36 ` [pbs-devel] [PATCH v3 proxmox-backup 50/58] client: backup writer: add injected chunk count to stats Christian Ebner
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 51/58] pxar: create: show chunk injection stats debug output Christian Ebner
2024-04-05 9:47 ` Fabian Grünbichler
2024-04-10 10:00 ` Christian Ebner
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 52/58] client: pxar: add entry kind format version Christian Ebner
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 53/58] client: pxar: opt encode cli exclude patterns as CliParams Christian Ebner
2024-04-05 9:49 ` Fabian Grünbichler
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 54/58] client: pxar: add flow chart for metadata change detection Christian Ebner
2024-04-05 10:16 ` Fabian Grünbichler
2024-04-10 10:04 ` Christian Ebner
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 55/58] docs: describe file format for split payload files Christian Ebner
2024-04-05 10:26 ` Fabian Grünbichler
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 56/58] docs: add section describing change detection mode Christian Ebner
2024-04-05 11:22 ` Fabian Grünbichler
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 57/58] test-suite: add detection mode change benchmark Christian Ebner
2024-03-28 12:37 ` [pbs-devel] [PATCH v3 proxmox-backup 58/58] test-suite: add bin to deb, add shell completions Christian Ebner
2024-04-05 11:39 ` [pbs-devel] [PATCH v3 pxar proxmox-backup 00/58] fix #3174: improve file-level backup Fabian Grünbichler
2024-04-29 12:13 ` Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240328123707.336951-39-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox