From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id E32E995E07 for ; Wed, 28 Feb 2024 15:03:22 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 5BDA3D094 for ; Wed, 28 Feb 2024 15:02:59 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 28 Feb 2024 15:02:55 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 3E66B47BFB for ; Wed, 28 Feb 2024 15:02:54 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 28 Feb 2024 15:02:16 +0100 Message-Id: <20240228140226.1251979-27-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240228140226.1251979-1-c.ebner@proxmox.com> References: <20240228140226.1251979-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.045 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [RFC proxmox-backup 26/36] client: chunk stream: add chunk injection queues X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 28 Feb 2024 14:03:23 -0000 Adds a queue to the chunk stream to request forced boundaries at a given offset within the stream and inject reused chunks after this boundary. The chunks are then passed along to the uploader stream using the injection queue, which inserts them during upload. Signed-off-by: Christian Ebner --- examples/test_chunk_speed2.rs | 10 ++- pbs-client/src/backup_writer.rs | 89 +++++++++++-------- pbs-client/src/chunk_stream.rs | 42 ++++++++- pbs-client/src/pxar/create.rs | 6 +- pbs-client/src/pxar_backup_stream.rs | 8 +- proxmox-backup-client/src/main.rs | 28 ++++-- .../src/proxmox_restore_daemon/api.rs | 3 + pxar-bin/src/main.rs | 5 +- tests/catar.rs | 3 + 9 files changed, 147 insertions(+), 47 deletions(-) diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 3f69b436..b20a5b59 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -1,3 +1,6 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + use anyhow::Error; use futures::*; @@ -26,7 +29,12 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None); + let mut chunk_stream = ChunkStream::new( + stream, + None, + Arc::new(Mutex::new(VecDeque::new())), + Arc::new(Mutex::new(VecDeque::new())), + ); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index 8a03d8ea..e66b93df 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::future::Future; use std::os::unix::fs::OpenOptionsExt; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -265,6 +266,7 @@ impl BackupWriter { archive_name: &str, stream: impl Stream>, options: UploadOptions, + injection_queue: Option>>>, ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -341,6 +343,7 @@ impl BackupWriter { None }, options.compress, + injection_queue, ) .await?; @@ -637,6 +640,7 @@ impl BackupWriter { known_chunks: Arc>>, crypt_config: Option>, compress: bool, + injection_queue: Option>>>, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -663,48 +667,63 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); stream - .and_then(move |data| { - let chunk_len = data.len(); + .inject_reused_chunks( + injection_queue.unwrap_or_default(), + stream_len, + reused_len.clone(), + index_csum.clone(), + ) + .and_then(move |chunk_info| match chunk_info { + InjectedChunksInfo::Known(chunks) => { + total_chunks.fetch_add(chunks.len(), Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(chunks)) + } + InjectedChunksInfo::Raw((offset, data)) => { + let chunk_len = data.len(); - total_chunks.fetch_add(1, Ordering::SeqCst); - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; + total_chunks.fetch_add(1, Ordering::SeqCst); - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); + let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); - if let Some(ref crypt_config) = crypt_config { - chunk_builder = chunk_builder.crypt_config(crypt_config); - } + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } - let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + let mut known_chunks = known_chunks.lock().unwrap(); - let mut guard = index_csum.lock().unwrap(); - let csum = guard.as_mut().unwrap(); + let digest = chunk_builder.digest(); - let chunk_end = offset + chunk_len as u64; + let mut guard = index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); - if !is_fixed_chunk_size { - csum.update(&chunk_end.to_le_bytes()); - } - csum.update(digest); + let chunk_end = offset + chunk_len as u64; - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - known_chunk_count.fetch_add(1, Ordering::SeqCst); - reused_len.fetch_add(chunk_len, Ordering::SeqCst); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let compressed_stream_len2 = compressed_stream_len.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) + if !is_fixed_chunk_size { + csum.update(&chunk_end.to_le_bytes()); + } + csum.update(digest); + + let chunk_is_known = known_chunks.contains(digest); + if chunk_is_known { + known_chunk_count.fetch_add(1, Ordering::SeqCst); + reused_len.fetch_add(chunk_len, Ordering::SeqCst); + + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + } else { + let compressed_stream_len2 = compressed_stream_len.clone(); + known_chunks.insert(*digest); + + future::ready(chunk_builder.build().map(move |(chunk, digest)| { + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); + + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len: chunk_len as u64, + offset, + }) + })) + } } }) .merge_known_chunks() diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 895f6eae..891d6928 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -1,4 +1,6 @@ +use std::collections::VecDeque; use std::pin::Pin; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use anyhow::Error; @@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream}; use pbs_datastore::Chunker; +use crate::inject_reused_chunks::InjectChunks; + /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, chunker: Chunker, buffer: BytesMut, scan_pos: usize, + consumed: u64, + boundaries: Arc>>, + injections: Arc>>, } impl ChunkStream { - pub fn new(input: S, chunk_size: Option) -> Self { + pub fn new( + input: S, + chunk_size: Option, + boundaries: Arc>>, + injections: Arc>>, + ) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), buffer: BytesMut::new(), scan_pos: 0, + consumed: 0, + boundaries, + injections, } } } @@ -40,6 +55,29 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); loop { + { + // Make sure to release this lock as soon as possible + let mut boundaries = this.boundaries.lock().unwrap(); + if let Some(inject) = boundaries.pop_front() { + let max = this.consumed + this.buffer.len() as u64; + if inject.boundary <= max { + let chunk_size = (inject.boundary - this.consumed) as usize; + let result = this.buffer.split_to(chunk_size); + this.consumed += chunk_size as u64; + this.scan_pos = 0; + + // Add the size of the injected chunks to consumed, so chunk stream offsets + // are in sync with the rest of the archive. + this.consumed += inject.size as u64; + + this.injections.lock().unwrap().push_back(inject); + + return Poll::Ready(Some(Ok(result))); + } + boundaries.push_front(inject); + } + } + if this.scan_pos < this.buffer.len() { let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); @@ -50,7 +88,9 @@ where // continue poll } else if chunk_size <= this.buffer.len() { let result = this.buffer.split_to(chunk_size); + this.consumed += chunk_size as u64; this.scan_pos = 0; + return Poll::Ready(Some(Ok(result))); } else { panic!("got unexpected chunk boundary from chunker"); diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 59aa4450..9ae84d37 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::ffi::{CStr, CString, OsStr}; use std::fmt; use std::io::{self, Read}; @@ -26,6 +26,7 @@ use proxmox_sys::fs::{self, acl, xattr}; use pbs_datastore::catalog::BackupCatalogWriter; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; @@ -131,6 +132,7 @@ struct Archiver { hardlinks: HashMap, file_copy_buffer: Vec, skip_e2big_xattr: bool, + forced_boundaries: Arc>>, } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -143,6 +145,7 @@ pub async fn create_archive( catalog: Option>>, mut payload_writer: Option, options: PxarCreateOptions, + forced_boundaries: Arc>>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -201,6 +204,7 @@ where hardlinks: HashMap::new(), file_copy_buffer: vec::undefined(4 * 1024 * 1024), skip_e2big_xattr: options.skip_e2big_xattr, + forced_boundaries, }; archiver diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index c7be4a66..77017779 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::io::Write; //use std::os::unix::io::FromRawFd; use std::path::Path; @@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter; use pbs_datastore::catalog::CatalogWriter; +use crate::inject_reused_chunks::InjectChunks; + /// Stream implementation to encode and upload .pxar archives. /// /// The hyper client needs an async Stream for file upload, so we @@ -40,6 +43,7 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Arc>>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; @@ -76,6 +80,7 @@ impl PxarBackupStream { Some(catalog), payload_writer, options, + boundaries, ) .await { @@ -111,11 +116,12 @@ impl PxarBackupStream { dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Arc>>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options, separate_payload_stream) + Self::new(dir, catalog, options, boundaries, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index c8ba67b4..290df4a1 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -197,14 +197,19 @@ async fn backup_directory>( bail!("cannot backup directory with fixed chunk size!"); } + let payload_boundaries = Arc::new(Mutex::new(VecDeque::new())); let (pxar_stream, payload_stream) = PxarBackupStream::open( dir_path.as_ref(), catalog, pxar_create_options, + payload_boundaries.clone(), payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let dummy_injections = Arc::new(Mutex::new(VecDeque::new())); + let dummy_boundaries = Arc::new(Mutex::new(VecDeque::new())); + let mut chunk_stream = + ChunkStream::new(pxar_stream, chunk_size, dummy_boundaries, dummy_injections); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -216,7 +221,7 @@ async fn backup_directory>( } }); - let stats = client.upload_stream(archive_name, stream, upload_options.clone()); + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); if let Some(payload_stream) = payload_stream { let payload_target = if let Some(payload_target) = payload_target { @@ -225,9 +230,12 @@ async fn backup_directory>( bail!("got payload stream, but no target archive name"); }; + let payload_injections = Arc::new(Mutex::new(VecDeque::new())); let mut payload_chunk_stream = ChunkStream::new( payload_stream, chunk_size, + payload_boundaries, + payload_injections.clone(), ); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -243,6 +251,7 @@ async fn backup_directory>( &payload_target, stream, upload_options, + Some(payload_injections), ); match futures::join!(stats, payload_stats) { @@ -279,7 +288,7 @@ async fn backup_image>( } let stats = client - .upload_stream(archive_name, stream, upload_options) + .upload_stream(archive_name, stream, upload_options, None) .await?; Ok(stats) @@ -570,7 +579,14 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); + let boundaries = Arc::new(Mutex::new(VecDeque::new())); + let injections = Arc::new(Mutex::new(VecDeque::new())); + let catalog_chunk_stream = ChunkStream::new( + catalog_stream, + Some(catalog_chunk_size), + boundaries, + injections.clone(), + ); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), @@ -586,7 +602,7 @@ fn spawn_catalog_upload( tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None) .await; if let Err(ref err) = catalog_upload_result { diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index bd8ddb20..d912734c 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -1,8 +1,10 @@ ///! File-restore API running inside the restore VM +use std::collections::VecDeque; use std::ffi::OsStr; use std::fs; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{bail, Error}; use futures::FutureExt; @@ -364,6 +366,7 @@ fn extract( None, None, options, + Arc::new(Mutex::new(VecDeque::new())), ) .await } diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index e3b0faac..74ee04f7 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -1,10 +1,10 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::ffi::OsStr; use std::fs::OpenOptions; use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::future::FutureExt; @@ -385,6 +385,7 @@ async fn create_archive( None, None, options, + Arc::new(Mutex::new(VecDeque::new())), ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 04af4ffd..6edd747d 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -1,4 +1,6 @@ +use std::collections::VecDeque; use std::process::Command; +use std::sync::{Arc, Mutex}; use anyhow::Error; @@ -41,6 +43,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { None, None, options, + Arc::new(Mutex::new(VecDeque::new())), ))?; Command::new("cmp") -- 2.39.2