From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 1D05E1FF389 for ; Tue, 7 May 2024 17:54:03 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C470B124BE; Tue, 7 May 2024 17:53:54 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Tue, 7 May 2024 17:52:23 +0200 Message-Id: <20240507155244.793819-42-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240507155244.793819-1-c.ebner@proxmox.com> References: <20240507155244.793819-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v5 proxmox-backup 41/62] client: streams: add channels for dynamic entry injection X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" To reuse dynamic entries of a previous backup run and index them for the new snapshot. Adds a non-blocking channel between the pxar archiver and the chunk stream, as well as the chunk stream and the backup writer. The archiver sends forced boundary positions and the dynamic entries to inject into the chunk stream following this boundary. The chunk stream consumes this channel inputs as receiver whenever a new chunk is requested by the upload stream, forcing a non-regular chunk boundary in the pxar stream at the requested positions. The dynamic entries to inject and the boundary are then send via the second asynchronous channel to the backup writer's upload stream, indexing them by inserting the dynamic entries as known chunks into the upload stream. Signed-off-by: Christian Ebner --- changes since version 4: - no changes examples/test_chunk_speed2.rs | 2 +- pbs-client/src/backup_writer.rs | 110 ++++++++++++------ pbs-client/src/chunk_stream.rs | 79 ++++++++++++- pbs-client/src/pxar/create.rs | 6 +- pbs-client/src/pxar_backup_stream.rs | 8 +- proxmox-backup-client/src/main.rs | 28 +++-- .../src/proxmox_restore_daemon/api.rs | 1 + pxar-bin/src/main.rs | 1 + tests/catar.rs | 1 + 9 files changed, 181 insertions(+), 55 deletions(-) diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 3f69b436d..22dd14ce2 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None); + let mut chunk_stream = ChunkStream::new(stream, None, None); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index dc9aa569f..66f209fed 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -265,6 +266,7 @@ impl BackupWriter { archive_name: &str, stream: impl Stream>, options: UploadOptions, + injections: Option>, ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -341,6 +343,7 @@ impl BackupWriter { None }, options.compress, + injections, ) .await?; @@ -636,6 +639,7 @@ impl BackupWriter { known_chunks: Arc>>, crypt_config: Option>, compress: bool, + injections: Option>, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -643,10 +647,12 @@ impl BackupWriter { let known_chunk_count2 = known_chunk_count.clone(); let stream_len = Arc::new(AtomicUsize::new(0)); + let stream_len1 = stream_len.clone(); let stream_len2 = stream_len.clone(); let compressed_stream_len = Arc::new(AtomicU64::new(0)); let compressed_stream_len2 = compressed_stream_len.clone(); let reused_len = Arc::new(AtomicUsize::new(0)); + let reused_len1 = reused_len.clone(); let reused_len2 = reused_len.clone(); let append_chunk_path = format!("{}_index", prefix); @@ -658,52 +664,79 @@ impl BackupWriter { let start_time = std::time::Instant::now(); - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); + let index_csum = Arc::new(Mutex::new(openssl::sha::Sha256::new())); + let index_csum_1 = index_csum.clone(); let index_csum_2 = index_csum.clone(); stream - .and_then(move |data| { - let chunk_len = data.len(); + .inject_reused_chunks(injections, stream_len) + .and_then(move |chunk_info| match chunk_info { + InjectedChunksInfo::Known(chunks) => { + // account for injected chunks + let count = chunks.len(); + total_chunks.fetch_add(count, Ordering::SeqCst); + + let mut known = Vec::new(); + let mut csum = index_csum_1.lock().unwrap(); + for chunk in chunks { + let offset = + stream_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64; + reused_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst); + let digest = chunk.digest(); + known.push((offset, digest)); + let end_offset = offset + chunk.size(); + csum.update(&end_offset.to_le_bytes()); + csum.update(&digest); + } + future::ok(MergedChunkInfo::Known(known)) + } + InjectedChunksInfo::Raw(raw) => { + // account for not injected chunks (new and known) + let offset = stream_len1.fetch_add(raw.len(), Ordering::SeqCst) as u64; + let chunk_len = raw.len() as u64; - total_chunks.fetch_add(1, Ordering::SeqCst); - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; + total_chunks.fetch_add(1, Ordering::SeqCst); - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); + let mut chunk_builder = DataChunkBuilder::new(raw.as_ref()).compress(compress); - if let Some(ref crypt_config) = crypt_config { - chunk_builder = chunk_builder.crypt_config(crypt_config); - } + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } - let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + let mut known_chunks = known_chunks.lock().unwrap(); - let mut guard = index_csum.lock().unwrap(); - let csum = guard.as_mut().unwrap(); + let digest = chunk_builder.digest(); - let chunk_end = offset + chunk_len as u64; + let mut csum = index_csum.lock().unwrap(); - if !is_fixed_chunk_size { - csum.update(&chunk_end.to_le_bytes()); - } - csum.update(digest); + let chunk_end = offset + chunk_len; - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - known_chunk_count.fetch_add(1, Ordering::SeqCst); - reused_len.fetch_add(chunk_len, Ordering::SeqCst); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let compressed_stream_len2 = compressed_stream_len.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) + if !is_fixed_chunk_size { + csum.update(&chunk_end.to_le_bytes()); + } + csum.update(digest); + + let chunk_is_known = known_chunks.contains(digest); + if chunk_is_known { + known_chunk_count.fetch_add(1, Ordering::SeqCst); + reused_len.fetch_add(chunk_len as usize, Ordering::SeqCst); + + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + } else { + let compressed_stream_len2 = compressed_stream_len.clone(); + known_chunks.insert(*digest); + + future::ready(chunk_builder.build().map(move |(chunk, digest)| { + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); + + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len, + offset, + }) + })) + } } }) .merge_known_chunks() @@ -771,8 +804,11 @@ impl BackupWriter { let size_reused = reused_len2.load(Ordering::SeqCst); let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize; - let mut guard = index_csum_2.lock().unwrap(); - let csum = guard.take().unwrap().finish(); + let csum = Arc::into_inner(index_csum_2) + .unwrap() + .into_inner() + .unwrap() + .finish(); futures::future::ok(UploadStats { chunk_count, diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 83c75ba28..728c0a88d 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks; /// Holds the queues for optional injection of reused dynamic index entries pub struct InjectionData { boundaries: mpsc::Receiver, + next_boundary: Option, injections: mpsc::Sender, consumed: u64, } @@ -25,6 +26,7 @@ impl InjectionData { ) -> Self { Self { boundaries, + next_boundary: None, injections, consumed: 0, } @@ -37,15 +39,17 @@ pub struct ChunkStream { chunker: Chunker, buffer: BytesMut, scan_pos: usize, + injection_data: Option, } impl ChunkStream { - pub fn new(input: S, chunk_size: Option) -> Self { + pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), buffer: BytesMut::new(), scan_pos: 0, + injection_data, } } } @@ -62,19 +66,82 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); + loop { + if let Some(InjectionData { + boundaries, + next_boundary, + injections, + consumed, + }) = this.injection_data.as_mut() + { + if next_boundary.is_none() { + if let Ok(boundary) = boundaries.try_recv() { + *next_boundary = Some(boundary); + } + } + + if let Some(inject) = next_boundary.take() { + // require forced boundary, lookup next regular boundary + let pos = this.chunker.scan(&this.buffer[this.scan_pos..]); + + let chunk_boundary = if pos == 0 { + *consumed + this.buffer.len() as u64 + } else { + *consumed + (this.scan_pos + pos) as u64 + }; + + if inject.boundary <= chunk_boundary { + // forced boundary is before next boundary, force within current buffer + let chunk_size = (inject.boundary - *consumed) as usize; + let raw_chunk = this.buffer.split_to(chunk_size); + *consumed += chunk_size as u64; + this.scan_pos = 0; + + // add the size of the injected chunks to consumed, so chunk stream offsets + // are in sync with the rest of the archive. + *consumed += inject.size as u64; + + injections.send(inject).unwrap(); + + // the chunk can be empty, return nevertheless to allow the caller to + // make progress by consuming from the injection queue + return Poll::Ready(Some(Ok(raw_chunk))); + } else if pos != 0 { + *next_boundary = Some(inject); + // forced boundary is after next boundary, split off chunk from buffer + let chunk_size = this.scan_pos + pos; + let raw_chunk = this.buffer.split_to(chunk_size); + *consumed += chunk_size as u64; + this.scan_pos = 0; + + return Poll::Ready(Some(Ok(raw_chunk))); + } else { + // forced boundary is after current buffer length, continue reading + *next_boundary = Some(inject); + this.scan_pos = this.buffer.len(); + } + } + } + if this.scan_pos < this.buffer.len() { + // look for next chunk boundary, starting from scan_pos let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); let chunk_size = this.scan_pos + boundary; if boundary == 0 { + // no new chunk boundary, update position for next boundary lookup this.scan_pos = this.buffer.len(); - // continue poll } else if chunk_size <= this.buffer.len() { - let result = this.buffer.split_to(chunk_size); + // found new chunk boundary inside buffer, split off chunk from buffer + let raw_chunk = this.buffer.split_to(chunk_size); + if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() { + *consumed += chunk_size as u64; + } this.scan_pos = 0; - return Poll::Ready(Some(Ok(result))); + + return Poll::Ready(Some(Ok(raw_chunk))); } else { panic!("got unexpected chunk boundary from chunker"); } @@ -82,10 +149,11 @@ where match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { Some(Err(err)) => { + // got error in byte stream, pass to consumer return Poll::Ready(Some(Err(err.into()))); } None => { - this.scan_pos = 0; + // end of stream reached, flush remaining bytes in buffer if !this.buffer.is_empty() { return Poll::Ready(Some(Ok(this.buffer.split()))); } else { @@ -93,6 +161,7 @@ where } } Some(Ok(data)) => { + // got new data, add to buffer this.buffer.extend_from_slice(data.as_ref()); } } diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 0f32efcce..dd3c64525 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -6,7 +6,7 @@ use std::ops::Range; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use anyhow::{bail, Context, Error}; use futures::future::BoxFuture; @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::index::IndexFile; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; @@ -134,6 +135,7 @@ struct Archiver { hardlinks: HashMap, file_copy_buffer: Vec, skip_e2big_xattr: bool, + forced_boundaries: Option>, } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -164,6 +166,7 @@ pub async fn create_archive( feature_flags: Flags, callback: F, options: PxarCreateOptions, + forced_boundaries: Option>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -224,6 +227,7 @@ where hardlinks: HashMap::new(), file_copy_buffer: vec::undefined(4 * 1024 * 1024), skip_e2big_xattr: options.skip_e2big_xattr, + forced_boundaries, }; archiver diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 95145cb0d..9d2cb41d6 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -2,7 +2,7 @@ use std::io::Write; //use std::os::unix::io::FromRawFd; use std::path::Path; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll}; use anyhow::{format_err, Error}; @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter; use pbs_datastore::catalog::CatalogWriter; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::create::PxarWriters; /// Stream implementation to encode and upload .pxar archives. @@ -42,6 +43,7 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Option>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let buffer_size = 256 * 1024; @@ -79,6 +81,7 @@ impl PxarBackupStream { Ok(()) }, options, + boundaries, ) .await { @@ -110,11 +113,12 @@ impl PxarBackupStream { dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Option>, separate_payload_stream: bool, ) -> Result<(Self, Option), Error> { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options, separate_payload_stream) + Self::new(dir, catalog, options, boundaries, separate_payload_stream) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index 821777d66..5e93f9542 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -45,8 +45,8 @@ use pbs_client::tools::{ use pbs_client::{ delete_ticket_info, parse_backup_specification, view_task_result, BackupReader, BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream, - FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions, - BACKUP_SOURCE_SCHEMA, + FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader, + UploadOptions, BACKUP_SOURCE_SCHEMA, }; use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter}; use pbs_datastore::chunk_store::verify_chunk_size; @@ -199,14 +199,16 @@ async fn backup_directory>( bail!("cannot backup directory with fixed chunk size!"); } + let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel(); let (pxar_stream, payload_stream) = PxarBackupStream::open( dir_path.as_ref(), catalog, pxar_create_options, + Some(payload_boundaries_tx), payload_target.is_some(), )?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(rx).map_err(Error::from); @@ -218,13 +220,16 @@ async fn backup_directory>( } }); - let stats = client.upload_stream(archive_name, stream, upload_options.clone()); + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); if let Some(payload_stream) = payload_stream { let payload_target = payload_target .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; - let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); + let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); + let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); + let mut payload_chunk_stream = + ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks let stream = ReceiverStream::new(payload_rx).map_err(Error::from); @@ -235,7 +240,12 @@ async fn backup_directory>( } }); - let payload_stats = client.upload_stream(&payload_target, stream, upload_options); + let payload_stats = client.upload_stream( + &payload_target, + stream, + upload_options, + Some(payload_injections_rx), + ); match futures::join!(stats, payload_stats) { (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), @@ -271,7 +281,7 @@ async fn backup_image>( } let stats = client - .upload_stream(archive_name, stream, upload_options) + .upload_stream(archive_name, stream, upload_options, None) .await?; Ok(stats) @@ -562,7 +572,7 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); + let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), @@ -578,7 +588,7 @@ fn spawn_catalog_upload( tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None) .await; if let Err(ref err) = catalog_upload_result { diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index ea97976e6..0883d6cda 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -364,6 +364,7 @@ fn extract( Flags::DEFAULT, |_| Ok(()), options, + None, ) .await } diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 58c9d2cfd..d46c98d2b 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -405,6 +405,7 @@ async fn create_archive( Ok(()) }, options, + None, ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 9e96a8610..d5ef85ffe 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { Flags::DEFAULT, |_| Ok(()), options, + None, ))?; Command::new("cmp") -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel