From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id AF9AC98A2E for ; Wed, 15 Nov 2023 16:48:59 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 8BD64970D for ; Wed, 15 Nov 2023 16:48:38 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 15 Nov 2023 16:48:35 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id A6C87432C3 for ; Wed, 15 Nov 2023 16:48:35 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 15 Nov 2023 16:48:03 +0100 Message-Id: <20231115154813.281564-19-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20231115154813.281564-1-c.ebner@proxmox.com> References: <20231115154813.281564-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.056 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH v5 proxmox-backup 18/28] fix #3174: chunker: add forced boundaries X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 15 Nov 2023 15:48:59 -0000 Allow to force a boundary while chunking and inject reused chunks into the stream. Duoble ended queues are used to control the boundaries and chunk injection between archiver, chunker and uploader. The archiver gets an interface to request a boundary and push a list of chunks to inject following that boundary. The chunker reads this queue, creating the boundary and passing the list of chunks to inject to the uploader via a second, dedicated double ended queue. Signed-off-by: Christian Ebner --- Changes since version 4: - no changes Changes since version 3: - no changes Changes since version 2: - no changes Changes since version 1: - no changes examples/test_chunk_speed2.rs | 9 +++- pbs-client/src/backup_writer.rs | 6 ++- pbs-client/src/chunk_stream.rs | 42 ++++++++++++++++++- pbs-client/src/pxar/create.rs | 10 ++++- pbs-client/src/pxar_backup_stream.rs | 8 +++- proxmox-backup-client/src/main.rs | 36 ++++++++++++---- .../src/proxmox_restore_daemon/api.rs | 13 +++++- pxar-bin/src/main.rs | 5 ++- tests/catar.rs | 3 ++ 9 files changed, 115 insertions(+), 17 deletions(-) diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs index 3f69b436..e8bac726 100644 --- a/examples/test_chunk_speed2.rs +++ b/examples/test_chunk_speed2.rs @@ -1,5 +1,7 @@ use anyhow::Error; use futures::*; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; extern crate proxmox_backup; @@ -26,7 +28,12 @@ async fn run() -> Result<(), Error> { .map_err(Error::from); //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); - let mut chunk_stream = ChunkStream::new(stream, None); + let mut chunk_stream = ChunkStream::new( + stream, + None, + Arc::new(Mutex::new(VecDeque::new())), + Arc::new(Mutex::new(VecDeque::new())), + ); let start_time = std::time::Instant::now(); diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index 8a03d8ea..cc6dd49a 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::future::Future; use std::os::unix::fs::OpenOptionsExt; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; +use super::inject_reused_chunks::InjectChunks; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -265,6 +266,7 @@ impl BackupWriter { archive_name: &str, stream: impl Stream>, options: UploadOptions, + injection_queue: Arc>>, ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -341,6 +343,7 @@ impl BackupWriter { None }, options.compress, + injection_queue, ) .await?; @@ -637,6 +640,7 @@ impl BackupWriter { known_chunks: Arc>>, crypt_config: Option>, compress: bool, + injection_queue: Arc>>, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 895f6eae..5cdf6916 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -1,5 +1,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; +use std::sync::{Arc, Mutex}; +use std::collections::VecDeque; use anyhow::Error; use bytes::BytesMut; @@ -8,21 +10,34 @@ use futures::stream::{Stream, TryStream}; use pbs_datastore::Chunker; +use crate::inject_reused_chunks::InjectChunks; + /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, chunker: Chunker, buffer: BytesMut, scan_pos: usize, + consumed: u64, + boundaries: Arc>>, + injections: Arc>>, } impl ChunkStream { - pub fn new(input: S, chunk_size: Option) -> Self { + pub fn new( + input: S, + chunk_size: Option, + boundaries: Arc>>, + injections: Arc>>, + ) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), buffer: BytesMut::new(), scan_pos: 0, + consumed: 0, + boundaries, + injections, } } } @@ -40,6 +55,29 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.get_mut(); loop { + { + // Make sure to release this lock and don't hold it longer than required + let mut boundaries = this.boundaries.lock().unwrap(); + if let Some(inject) = boundaries.pop_front() { + let max = this.consumed + this.buffer.len() as u64; + if inject.boundary <= max { + let chunk_size = (inject.boundary - this.consumed) as usize; + let result = this.buffer.split_to(chunk_size); + this.consumed += chunk_size as u64; + this.scan_pos = 0; + + // Add the size of the injected chunks to consumed, so chunk stream offsets + // are in sync with the rest of the archive. + this.consumed += inject.size as u64; + + this.injections.lock().unwrap().push_back(inject); + + return Poll::Ready(Some(Ok(result))); + } + boundaries.push_front(inject); + } + } + if this.scan_pos < this.buffer.len() { let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); @@ -50,7 +88,9 @@ where // continue poll } else if chunk_size <= this.buffer.len() { let result = this.buffer.split_to(chunk_size); + this.consumed += chunk_size as u64; this.scan_pos = 0; + return Poll::Ready(Some(Ok(result))); } else { panic!("got unexpected chunk boundary from chunker"); diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs index 2b542972..c4d207d8 100644 --- a/pbs-client/src/pxar/create.rs +++ b/pbs-client/src/pxar/create.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::ffi::{CStr, CString, OsStr}; use std::fmt; use std::io::{self, Read}; @@ -25,8 +25,9 @@ use proxmox_lang::c_str; use proxmox_sys::fs::{self, acl, xattr}; use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader}; -use pbs_datastore::dynamic_index::DynamicIndexReader; +use pbs_datastore::dynamic_index::{DynamicEntry, DynamicIndexReader}; +use crate::inject_reused_chunks::InjectChunks; use crate::pxar::metadata::errno_is_unsupported; use crate::pxar::tools::assert_single_path_component; use crate::pxar::Flags; @@ -142,6 +143,8 @@ struct Archiver { hardlinks: HashMap, file_copy_buffer: Vec, previous_ref: Option, + forced_boundaries: Arc>>, + inject: (usize, Vec), } type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; @@ -153,6 +156,7 @@ pub async fn create_archive( callback: F, catalog: Option>>, options: PxarCreateOptions, + forced_boundaries: Arc>>, ) -> Result<(), Error> where T: SeqWrite + Send, @@ -211,6 +215,8 @@ where hardlinks: HashMap::new(), file_copy_buffer: vec::undefined(4 * 1024 * 1024), previous_ref: options.previous_ref, + forced_boundaries, + inject: (0, Vec::new()), }; archiver diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs index 22a6ffdc..d18ba470 100644 --- a/pbs-client/src/pxar_backup_stream.rs +++ b/pbs-client/src/pxar_backup_stream.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::io::Write; //use std::os::unix::io::FromRawFd; use std::path::Path; @@ -17,6 +18,8 @@ use proxmox_io::StdChannelWriter; use pbs_datastore::catalog::CatalogWriter; +use crate::inject_reused_chunks::InjectChunks; + /// Stream implementation to encode and upload .pxar archives. /// /// The hyper client needs an async Stream for file upload, so we @@ -40,6 +43,7 @@ impl PxarBackupStream { dir: Dir, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Arc>>, ) -> Result { let (tx, rx) = std::sync::mpsc::sync_channel(10); @@ -64,6 +68,7 @@ impl PxarBackupStream { }, Some(catalog), options, + boundaries, ) .await { @@ -87,10 +92,11 @@ impl PxarBackupStream { dirname: &Path, catalog: Arc>>, options: crate::pxar::PxarCreateOptions, + boundaries: Arc>>, ) -> Result { let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; - Self::new(dir, catalog, options) + Self::new(dir, catalog, options, boundaries) } } diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs index 509fa22c..5945ae5d 100644 --- a/proxmox-backup-client/src/main.rs +++ b/proxmox-backup-client/src/main.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::pin::Pin; @@ -192,8 +192,17 @@ async fn backup_directory>( pxar_create_options: pbs_client::pxar::PxarCreateOptions, upload_options: UploadOptions, ) -> Result { - let pxar_stream = PxarBackupStream::open(dir_path.as_ref(), catalog, pxar_create_options)?; - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); + let boundaries = Arc::new(Mutex::new(VecDeque::new())); + let pxar_stream = PxarBackupStream::open( + dir_path.as_ref(), + catalog, + pxar_create_options, + boundaries.clone(), + )?; + + let injections = Arc::new(Mutex::new(VecDeque::new())); + let mut chunk_stream = + ChunkStream::new(pxar_stream, chunk_size, boundaries, injections.clone()); let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks @@ -211,7 +220,7 @@ async fn backup_directory>( } let stats = client - .upload_stream(archive_name, stream, upload_options) + .upload_stream(archive_name, stream, upload_options, injections) .await?; Ok(stats) @@ -237,8 +246,9 @@ async fn backup_image>( bail!("cannot backup image with dynamic chunk size!"); } + let injection_queue = Arc::new(Mutex::new(VecDeque::new())); let stats = client - .upload_stream(archive_name, stream, upload_options) + .upload_stream(archive_name, stream, upload_options, injection_queue) .await?; Ok(stats) @@ -529,7 +539,14 @@ fn spawn_catalog_upload( let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); let catalog_chunk_size = 512 * 1024; - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); + let boundaries = Arc::new(Mutex::new(VecDeque::new())); + let injections = Arc::new(Mutex::new(VecDeque::new())); + let catalog_chunk_stream = ChunkStream::new( + catalog_stream, + Some(catalog_chunk_size), + boundaries, + injections.clone(), + ); let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( StdChannelWriter::new(catalog_tx), @@ -545,7 +562,12 @@ fn spawn_catalog_upload( tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) + .upload_stream( + CATALOG_NAME, + catalog_chunk_stream, + upload_options, + injections, + ) .await; if let Err(ref err) = catalog_upload_result { diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs index f89b0ab4..5eff673e 100644 --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs @@ -1,8 +1,10 @@ ///! File-restore API running inside the restore VM +use std::collections::VecDeque; use std::ffi::OsStr; use std::fs; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use anyhow::{bail, Error}; use futures::FutureExt; @@ -360,8 +362,15 @@ fn extract( }; let pxar_writer = TokioWriter::new(writer); - create_archive(dir, pxar_writer, Flags::DEFAULT, |_| Ok(()), None, options) - .await + create_archive( + dir, + pxar_writer, + Flags::DEFAULT, + |_| Ok(()), + None, + options, + Arc::new(Mutex::new(VecDeque::new())), + ).await } .await; if let Err(err) = result { diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs index 9376a2c1..c019f3e4 100644 --- a/pxar-bin/src/main.rs +++ b/pxar-bin/src/main.rs @@ -1,10 +1,10 @@ -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use std::ffi::OsStr; use std::fs::OpenOptions; use std::os::unix::fs::OpenOptionsExt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::future::FutureExt; @@ -384,6 +384,7 @@ async fn create_archive( }, None, options, + Arc::new(Mutex::new(VecDeque::new())), ) .await?; diff --git a/tests/catar.rs b/tests/catar.rs index 36bb4f3b..d69cb37b 100644 --- a/tests/catar.rs +++ b/tests/catar.rs @@ -1,4 +1,6 @@ use std::process::Command; +use std::sync::{Arc, Mutex}; +use std::collections::VecDeque; use anyhow::Error; @@ -40,6 +42,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { |_| Ok(()), None, options, + Arc::new(Mutex::new(VecDeque::new())), ))?; Command::new("cmp") -- 2.39.2