public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v6 proxmox-backup 18/29] fix #3174: chunker: add forced boundaries
Date: Thu, 25 Jan 2024 14:25:57 +0100	[thread overview]
Message-ID: <20240125132608.1172472-19-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240125132608.1172472-1-c.ebner@proxmox.com>

Allow to force a boundary while chunking and inject reused chunks into
the stream. Duoble ended queues are used to control the boundaries and
chunk injection between archiver, chunker and uploader.

The archiver gets an interface to request a boundary and push a list of
chunks to inject following that boundary. The chunker reads this queue,
creating the boundary and passing the list of chunks to inject to the
uploader via a second, dedicated double ended queue.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Changes since v5:
- fix formatting via `cargo fmt`

 examples/test_chunk_speed2.rs                 |  9 +++-
 pbs-client/src/backup_writer.rs               |  6 ++-
 pbs-client/src/chunk_stream.rs                | 42 ++++++++++++++++++-
 pbs-client/src/pxar/create.rs                 | 10 ++++-
 pbs-client/src/pxar_backup_stream.rs          |  8 +++-
 proxmox-backup-client/src/main.rs             | 36 ++++++++++++----
 .../src/proxmox_restore_daemon/api.rs         | 13 +++++-
 pxar-bin/src/main.rs                          |  5 ++-
 tests/catar.rs                                |  3 ++
 9 files changed, 115 insertions(+), 17 deletions(-)

diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
index 3f69b436..e8bac726 100644
--- a/examples/test_chunk_speed2.rs
+++ b/examples/test_chunk_speed2.rs
@@ -1,5 +1,7 @@
 use anyhow::Error;
 use futures::*;
+use std::collections::VecDeque;
+use std::sync::{Arc, Mutex};
 
 extern crate proxmox_backup;
 
@@ -26,7 +28,12 @@ async fn run() -> Result<(), Error> {
         .map_err(Error::from);
 
     //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
-    let mut chunk_stream = ChunkStream::new(stream, None);
+    let mut chunk_stream = ChunkStream::new(
+        stream,
+        None,
+        Arc::new(Mutex::new(VecDeque::new())),
+        Arc::new(Mutex::new(VecDeque::new())),
+    );
 
     let start_time = std::time::Instant::now();
 
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 8a03d8ea..cc6dd49a 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
 use std::future::Future;
 use std::os::unix::fs::OpenOptionsExt;
 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
@@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
 
 use proxmox_human_byte::HumanByte;
 
+use super::inject_reused_chunks::InjectChunks;
 use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
 
 use super::{H2Client, HttpClient};
@@ -265,6 +266,7 @@ impl BackupWriter {
         archive_name: &str,
         stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
         options: UploadOptions,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
     ) -> Result<BackupStats, Error> {
         let known_chunks = Arc::new(Mutex::new(HashSet::new()));
 
@@ -341,6 +343,7 @@ impl BackupWriter {
                 None
             },
             options.compress,
+            injection_queue,
         )
         .await?;
 
@@ -637,6 +640,7 @@ impl BackupWriter {
         known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
         crypt_config: Option<Arc<CryptConfig>>,
         compress: bool,
+        injection_queue: Arc<Mutex<VecDeque<InjectChunks>>>,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let total_chunks = Arc::new(AtomicUsize::new(0));
         let total_chunks2 = total_chunks.clone();
diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
index 895f6eae..4a6301c3 100644
--- a/pbs-client/src/chunk_stream.rs
+++ b/pbs-client/src/chunk_stream.rs
@@ -1,4 +1,6 @@
+use std::collections::VecDeque;
 use std::pin::Pin;
+use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll};
 
 use anyhow::Error;
@@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream};
 
 use pbs_datastore::Chunker;
 
+use crate::inject_reused_chunks::InjectChunks;
+
 /// Split input stream into dynamic sized chunks
 pub struct ChunkStream<S: Unpin> {
     input: S,
     chunker: Chunker,
     buffer: BytesMut,
     scan_pos: usize,
+    consumed: u64,
+    boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+    injections: Arc<Mutex<VecDeque<InjectChunks>>>,
 }
 
 impl<S: Unpin> ChunkStream<S> {
-    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
+    pub fn new(
+        input: S,
+        chunk_size: Option<usize>,
+        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+        injections: Arc<Mutex<VecDeque<InjectChunks>>>,
+    ) -> Self {
         Self {
             input,
             chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
             buffer: BytesMut::new(),
             scan_pos: 0,
+            consumed: 0,
+            boundaries,
+            injections,
         }
     }
 }
@@ -40,6 +55,29 @@ where
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
         let this = self.get_mut();
         loop {
+            {
+                // Make sure to release this lock and don't hold it longer than required
+                let mut boundaries = this.boundaries.lock().unwrap();
+                if let Some(inject) = boundaries.pop_front() {
+                    let max = this.consumed + this.buffer.len() as u64;
+                    if inject.boundary <= max {
+                        let chunk_size = (inject.boundary - this.consumed) as usize;
+                        let result = this.buffer.split_to(chunk_size);
+                        this.consumed += chunk_size as u64;
+                        this.scan_pos = 0;
+
+                        // Add the size of the injected chunks to consumed, so chunk stream offsets
+                        // are in sync with the rest of the archive.
+                        this.consumed += inject.size as u64;
+
+                        this.injections.lock().unwrap().push_back(inject);
+
+                        return Poll::Ready(Some(Ok(result)));
+                    }
+                    boundaries.push_front(inject);
+                }
+            }
+
             if this.scan_pos < this.buffer.len() {
                 let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
 
@@ -50,7 +88,9 @@ where
                     // continue poll
                 } else if chunk_size <= this.buffer.len() {
                     let result = this.buffer.split_to(chunk_size);
+                    this.consumed += chunk_size as u64;
                     this.scan_pos = 0;
+
                     return Poll::Ready(Some(Ok(result)));
                 } else {
                     panic!("got unexpected chunk boundary from chunker");
diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
index 4f6ffc78..9980c393 100644
--- a/pbs-client/src/pxar/create.rs
+++ b/pbs-client/src/pxar/create.rs
@@ -1,4 +1,4 @@
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, VecDeque};
 use std::ffi::{CStr, CString, OsStr};
 use std::fmt;
 use std::io::{self, Read};
@@ -25,8 +25,9 @@ use proxmox_lang::c_str;
 use proxmox_sys::fs::{self, acl, xattr};
 
 use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader};
-use pbs_datastore::dynamic_index::DynamicIndexReader;
+use pbs_datastore::dynamic_index::{DynamicEntry, DynamicIndexReader};
 
+use crate::inject_reused_chunks::InjectChunks;
 use crate::pxar::metadata::errno_is_unsupported;
 use crate::pxar::tools::assert_single_path_component;
 use crate::pxar::Flags;
@@ -142,6 +143,8 @@ struct Archiver {
     hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
     file_copy_buffer: Vec<u8>,
     previous_ref: Option<PxarPrevRef>,
+    forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
+    inject: (usize, Vec<DynamicEntry>),
 }
 
 type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
@@ -153,6 +156,7 @@ pub async fn create_archive<T, F>(
     callback: F,
     catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
     options: PxarCreateOptions,
+    forced_boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
 ) -> Result<(), Error>
 where
     T: SeqWrite + Send,
@@ -211,6 +215,8 @@ where
         hardlinks: HashMap::new(),
         file_copy_buffer: vec::undefined(4 * 1024 * 1024),
         previous_ref: options.previous_ref,
+        forced_boundaries,
+        inject: (0, Vec::new()),
     };
 
     archiver
diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
index 22a6ffdc..d18ba470 100644
--- a/pbs-client/src/pxar_backup_stream.rs
+++ b/pbs-client/src/pxar_backup_stream.rs
@@ -1,3 +1,4 @@
+use std::collections::VecDeque;
 use std::io::Write;
 //use std::os::unix::io::FromRawFd;
 use std::path::Path;
@@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter;
 
 use pbs_datastore::catalog::CatalogWriter;
 
+use crate::inject_reused_chunks::InjectChunks;
+
 /// Stream implementation to encode and upload .pxar archives.
 ///
 /// The hyper client needs an async Stream for file upload, so we
@@ -40,6 +43,7 @@ impl PxarBackupStream {
         dir: Dir,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
+        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
     ) -> Result<Self, Error> {
         let (tx, rx) = std::sync::mpsc::sync_channel(10);
 
@@ -64,6 +68,7 @@ impl PxarBackupStream {
                 },
                 Some(catalog),
                 options,
+                boundaries,
             )
             .await
             {
@@ -87,10 +92,11 @@ impl PxarBackupStream {
         dirname: &Path,
         catalog: Arc<Mutex<CatalogWriter<W>>>,
         options: crate::pxar::PxarCreateOptions,
+        boundaries: Arc<Mutex<VecDeque<InjectChunks>>>,
     ) -> Result<Self, Error> {
         let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
 
-        Self::new(dir, catalog, options)
+        Self::new(dir, catalog, options, boundaries)
     }
 }
 
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index f575ccd0..442616b7 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1,4 +1,4 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
 use std::io::{self, Read, Seek, SeekFrom, Write};
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
@@ -192,8 +192,17 @@ async fn backup_directory<P: AsRef<Path>>(
     pxar_create_options: pbs_client::pxar::PxarCreateOptions,
     upload_options: UploadOptions,
 ) -> Result<BackupStats, Error> {
-    let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?;
-    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
+    let boundaries = Arc::new(Mutex::new(VecDeque::new()));
+    let pxar_stream = PxarBackupStream::open(
+        dir_path.as_ref(),
+        catalog,
+        pxar_create_options,
+        boundaries.clone(),
+    )?;
+
+    let injections = Arc::new(Mutex::new(VecDeque::new()));
+    let mut chunk_stream =
+        ChunkStream::new(pxar_stream, chunk_size, boundaries, injections.clone());
 
     let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
 
@@ -211,7 +220,7 @@ async fn backup_directory<P: AsRef<Path>>(
     }
 
     let stats = client
-        .upload_stream(archive_name, stream, upload_options)
+        .upload_stream(archive_name, stream, upload_options, injections)
         .await?;
 
     Ok(stats)
@@ -237,8 +246,9 @@ async fn backup_image<P: AsRef<Path>>(
         bail!("cannot backup image with dynamic chunk size!");
     }
 
+    let injection_queue = Arc::new(Mutex::new(VecDeque::new()));
     let stats = client
-        .upload_stream(archive_name, stream, upload_options)
+        .upload_stream(archive_name, stream, upload_options, injection_queue)
         .await?;
 
     Ok(stats)
@@ -529,7 +539,14 @@ fn spawn_catalog_upload(
     let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
     let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
     let catalog_chunk_size = 512 * 1024;
-    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
+    let boundaries = Arc::new(Mutex::new(VecDeque::new()));
+    let injections = Arc::new(Mutex::new(VecDeque::new()));
+    let catalog_chunk_stream = ChunkStream::new(
+        catalog_stream,
+        Some(catalog_chunk_size),
+        boundaries,
+        injections.clone(),
+    );
 
     let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
         StdChannelWriter::new(catalog_tx),
@@ -545,7 +562,12 @@ fn spawn_catalog_upload(
 
     tokio::spawn(async move {
         let catalog_upload_result = client
-            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
+            .upload_stream(
+                CATALOG_NAME,
+                catalog_chunk_stream,
+                upload_options,
+                injections,
+            )
             .await;
 
         if let Err(ref err) = catalog_upload_result {
diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
index f89b0ab4..5eff673e 100644
--- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
+++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
@@ -1,8 +1,10 @@
 ///! File-restore API running inside the restore VM
+use std::collections::VecDeque;
 use std::ffi::OsStr;
 use std::fs;
 use std::os::unix::ffi::OsStrExt;
 use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, Error};
 use futures::FutureExt;
@@ -360,8 +362,15 @@ fn extract(
                     };
 
                     let pxar_writer = TokioWriter::new(writer);
-                    create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options)
-                        .await
+                    create_archive(
+                        dir,
+                        pxar_writer,
+                        Flags::DEFAULT,
+                        |_| Ok(()),
+                        None,
+                        options,
+                        Arc::new(Mutex::new(VecDeque::new())),
+                    ).await
                 }
                 .await;
                 if let Err(err) = result {
diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
index 9376a2c1..c019f3e4 100644
--- a/pxar-bin/src/main.rs
+++ b/pxar-bin/src/main.rs
@@ -1,10 +1,10 @@
-use std::collections::HashSet;
+use std::collections::{HashSet, VecDeque};
 use std::ffi::OsStr;
 use std::fs::OpenOptions;
 use std::os::unix::fs::OpenOptionsExt;
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 
 use anyhow::{bail, format_err, Error};
 use futures::future::FutureExt;
@@ -384,6 +384,7 @@ async fn create_archive(
         },
         None,
         options,
+        Arc::new(Mutex::new(VecDeque::new())),
     )
     .await?;
 
diff --git a/tests/catar.rs b/tests/catar.rs
index 36bb4f3b..bb9da2fa 100644
--- a/tests/catar.rs
+++ b/tests/catar.rs
@@ -1,4 +1,6 @@
+use std::collections::VecDeque;
 use std::process::Command;
+use std::sync::{Arc, Mutex};
 
 use anyhow::Error;
 
@@ -40,6 +42,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
         |_| Ok(()),
         None,
         options,
+        Arc::new(Mutex::new(VecDeque::new())),
     ))?;
 
     Command::new("cmp")
-- 
2.39.2





  parent reply	other threads:[~2024-01-25 13:34 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-01-25 13:25 [pbs-devel] [PATCH-SERIES v6 pxar proxmox-backup proxmox-widget-toolkit 0/29] fix #3174: improve file-level backup Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 1/29] fix #3174: decoder: factor out skip_bytes from skip_entry Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 2/29] fix #3174: decoder: impl skip_bytes for sync dec Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 3/29] fix #3174: encoder: calc filename + metadata byte size Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 4/29] fix #3174: enc/dec: impl PXAR_APPENDIX_REF entrytype Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 5/29] fix #3174: enc/dec: impl PXAR_APPENDIX entrytype Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 6/29] fix #3174: encoder: helper to add to encoder position Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 7/29] fix #3174: enc/dec: impl PXAR_APPENDIX_TAIL entrytype Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 8/29] fix #3174: add pxar format version entry Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 pxar 9/29] fix #3174: enc: move from instance per dir to state stack Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 10/29] fix #3174: index: add fn index list from start/end-offsets Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 11/29] fix #3174: api: double catalog upload size Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 12/29] fix #3174: catalog: introduce extended format v2 Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 13/29] fix #3174: archiver/extractor: impl appendix ref Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 14/29] fix #3174: catalog: add specialized Archive entry Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 15/29] fix #3174: extractor: impl seq restore from appendix Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 16/29] fix #3174: archiver: store ref to previous backup Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 17/29] fix #3174: upload stream: impl reused chunk injector Christian Ebner
2024-01-25 13:25 ` Christian Ebner [this message]
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 19/29] fix #3174: backup writer: inject queued chunk in upload steam Christian Ebner
2024-01-25 13:25 ` [pbs-devel] [PATCH v6 proxmox-backup 20/29] fix #3174: archiver: reuse files with unchanged metadata Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 21/29] fix #3174: specs: add backup detection mode specification Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 22/29] fix #3174: client: Add detection mode to backup creation Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 23/29] catalog: use format version 2 conditionally Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 24/29] tools: add optional path prefix to line based output Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 25/29] pxar-bin: implement listing for appendix entries Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 26/29] test-suite: add detection mode change benchmark Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 27/29] test-suite: Add bin to deb, add shell completions Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-backup 28/29] pxar: add lookahead caching to reduce chunk fragmentation Christian Ebner
2024-01-25 13:26 ` [pbs-devel] [PATCH v6 proxmox-widget-toolkit 29/29] file-browser: support pxar archive and fileref types Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240125132608.1172472-19-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal