From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 7F7F91FF396 for ; Wed, 22 May 2024 11:56:40 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CE6B39C45; Wed, 22 May 2024 11:56:57 +0200 (CEST) Message-ID: <7a285e31-7483-4840-8b50-f5655fc97a64@proxmox.com> Date: Wed, 22 May 2024 11:56:18 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird Beta To: Proxmox Backup Server development discussion , Christian Ebner References: <20240514103421.289431-1-c.ebner@proxmox.com> <20240514103421.289431-43-c.ebner@proxmox.com> Content-Language: en-US From: Dominik Csapak In-Reply-To: <20240514103421.289431-43-c.ebner@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.016 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [main.rs, create.rs, api.rs, catar.rs] Subject: Re: [pbs-devel] [PATCH v6 proxmox-backup 42/65] client: streams: add channels for dynamic entry injection X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" high level: i think this patch would be much cleaner if the 57/65 datastore:chunker: add Chunker trait would come before this, because you change stuff around only to change it again in a way that could be written better if those patches were reversed, e.g. in case there was a next_boundary in the ChunkStream, in the final code there is the same code from the regulary chunking duplicated (this.buffer.split_to;this.consumed +=;this.scan_pos) and could be trivially handled by the same code part (with a bit of reordering) but the patches in this order make it hard to see since here the paths are different enough so they can't be shared some other comments inline: On 5/14/24 12:33, Christian Ebner wrote: > To reuse dynamic entries of a previous backup run and index them for > the new snapshot. Adds a non-blocking channel between the pxar > archiver and the chunk stream, as well as the chunk stream and the > backup writer. > > The archiver sends forced boundary positions and the dynamic > entries to inject into the chunk stream following this boundary. > > The chunk stream consumes this channel inputs as receiver whenever a > new chunk is requested by the upload stream, forcing a non-regular > chunk boundary in the pxar stream at the requested positions. > > The dynamic entries to inject and the boundary are then send via the > second asynchronous channel to the backup writer's upload stream, > indexing them by inserting the dynamic entries as known chunks into > the upload stream. > > Signed-off-by: Christian Ebner > --- > examples/test_chunk_speed2.rs | 2 +- > pbs-client/src/backup_writer.rs | 110 ++++++++++++------ > pbs-client/src/chunk_stream.rs | 81 ++++++++++++- > pbs-client/src/pxar/create.rs | 6 +- > pbs-client/src/pxar_backup_stream.rs | 8 +- > proxmox-backup-client/src/main.rs | 28 +++-- > .../src/proxmox_restore_daemon/api.rs | 1 + > pxar-bin/src/main.rs | 1 + > tests/catar.rs | 1 + > 9 files changed, 183 insertions(+), 55 deletions(-) > > diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs > index 3f69b436d..22dd14ce2 100644 > --- a/examples/test_chunk_speed2.rs > +++ b/examples/test_chunk_speed2.rs > @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> { > .map_err(Error::from); > > //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024); > - let mut chunk_stream = ChunkStream::new(stream, None); > + let mut chunk_stream = ChunkStream::new(stream, None, None); > > let start_time = std::time::Instant::now(); > > diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs > index dc9aa569f..66f209fed 100644 > --- a/pbs-client/src/backup_writer.rs > +++ b/pbs-client/src/backup_writer.rs > @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; > > use proxmox_human_byte::HumanByte; > > +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; > use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; > > use super::{H2Client, HttpClient}; > @@ -265,6 +266,7 @@ impl BackupWriter { > archive_name: &str, > stream: impl Stream>, > options: UploadOptions, > + injections: Option>, > ) -> Result { > let known_chunks = Arc::new(Mutex::new(HashSet::new())); > > @@ -341,6 +343,7 @@ impl BackupWriter { > None > }, > options.compress, > + injections, > ) > .await?; > > @@ -636,6 +639,7 @@ impl BackupWriter { > known_chunks: Arc>>, > crypt_config: Option>, > compress: bool, > + injections: Option>, > ) -> impl Future> { > let total_chunks = Arc::new(AtomicUsize::new(0)); > let total_chunks2 = total_chunks.clone(); > @@ -643,10 +647,12 @@ impl BackupWriter { > let known_chunk_count2 = known_chunk_count.clone(); > > let stream_len = Arc::new(AtomicUsize::new(0)); > + let stream_len1 = stream_len.clone(); > let stream_len2 = stream_len.clone(); > let compressed_stream_len = Arc::new(AtomicU64::new(0)); > let compressed_stream_len2 = compressed_stream_len.clone(); > let reused_len = Arc::new(AtomicUsize::new(0)); > + let reused_len1 = reused_len.clone(); both of these variables are not really necessary? in the closure you could simply keep using 'stream_len' and in the .inject_reused_chunks call you can clone it inline the reused_len1 is used in the same closure as reused_len so there you can simply use that instead for both updates > let reused_len2 = reused_len.clone(); > > let append_chunk_path = format!("{}_index", prefix); > @@ -658,52 +664,79 @@ impl BackupWriter { > > let start_time = std::time::Instant::now(); > > - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); > + let index_csum = Arc::new(Mutex::new(openssl::sha::Sha256::new())); why do you change this? you can simply use the let mut guard = ..; let csum = ..; pattern everywhere you need? > + let index_csum_1 = index_csum.clone(); > let index_csum_2 = index_csum.clone(); > > stream > - .and_then(move |data| { > - let chunk_len = data.len(); > + .inject_reused_chunks(injections, stream_len) > + .and_then(move |chunk_info| match chunk_info { > + InjectedChunksInfo::Known(chunks) => { > + // account for injected chunks > + let count = chunks.len(); > + total_chunks.fetch_add(count, Ordering::SeqCst); > + > + let mut known = Vec::new(); > + let mut csum = index_csum_1.lock().unwrap(); > + for chunk in chunks { > + let offset = > + stream_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64; > + reused_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst); > + let digest = chunk.digest(); > + known.push((offset, digest)); > + let end_offset = offset + chunk.size(); > + csum.update(&end_offset.to_le_bytes()); > + csum.update(&digest); > + } > + future::ok(MergedChunkInfo::Known(known)) > + } > + InjectedChunksInfo::Raw(raw) => { if you rename 'raw' to data and... > + // account for not injected chunks (new and known) > + let offset = stream_len1.fetch_add(raw.len(), Ordering::SeqCst) as u64; > + let chunk_len = raw.len() as u64; leave chunk_len as usize and.. > > - total_chunks.fetch_add(1, Ordering::SeqCst); > - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; > + total_chunks.fetch_add(1, Ordering::SeqCst); reorder these statements a bit and don't add some unnecssary newlines these hunk get much more readable with 'git diff -w' namely it completely dissapears :P since all you do here is basically indenting the code one level but that was really hard to see with the (IMHO not necessary) changes to the code > > - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); > + let mut chunk_builder = DataChunkBuilder::new(raw.as_ref()).compress(compress); > > - if let Some(ref crypt_config) = crypt_config { > - chunk_builder = chunk_builder.crypt_config(crypt_config); > - } > + if let Some(ref crypt_config) = crypt_config { > + chunk_builder = chunk_builder.crypt_config(crypt_config); > + } > > - let mut known_chunks = known_chunks.lock().unwrap(); > - let digest = chunk_builder.digest(); > + let mut known_chunks = known_chunks.lock().unwrap(); > > - let mut guard = index_csum.lock().unwrap(); > - let csum = guard.as_mut().unwrap(); > + let digest = chunk_builder.digest(); > > - let chunk_end = offset + chunk_len as u64; > + let mut csum = index_csum.lock().unwrap(); > > - if !is_fixed_chunk_size { > - csum.update(&chunk_end.to_le_bytes()); > - } > - csum.update(digest); > + let chunk_end = offset + chunk_len; > > - let chunk_is_known = known_chunks.contains(digest); > - if chunk_is_known { > - known_chunk_count.fetch_add(1, Ordering::SeqCst); > - reused_len.fetch_add(chunk_len, Ordering::SeqCst); > - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) > - } else { > - let compressed_stream_len2 = compressed_stream_len.clone(); > - known_chunks.insert(*digest); > - future::ready(chunk_builder.build().map(move |(chunk, digest)| { > - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); > - MergedChunkInfo::New(ChunkInfo { > - chunk, > - digest, > - chunk_len: chunk_len as u64, > - offset, > - }) > - })) > + if !is_fixed_chunk_size { > + csum.update(&chunk_end.to_le_bytes()); > + } > + csum.update(digest); > + > + let chunk_is_known = known_chunks.contains(digest); > + if chunk_is_known { > + known_chunk_count.fetch_add(1, Ordering::SeqCst); > + reused_len.fetch_add(chunk_len as usize, Ordering::SeqCst); > + > + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) > + } else { > + let compressed_stream_len2 = compressed_stream_len.clone(); > + known_chunks.insert(*digest); > + > + future::ready(chunk_builder.build().map(move |(chunk, digest)| { > + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); > + > + MergedChunkInfo::New(ChunkInfo { > + chunk, > + digest, > + chunk_len, > + offset, > + }) > + })) > + } > } > }) > .merge_known_chunks() > @@ -771,8 +804,11 @@ impl BackupWriter { > let size_reused = reused_len2.load(Ordering::SeqCst); > let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize; > > - let mut guard = index_csum_2.lock().unwrap(); > - let csum = guard.take().unwrap().finish(); > + let csum = Arc::into_inner(index_csum_2) > + .unwrap() > + .into_inner() > + .unwrap() > + .finish(); > if you don't change the type above, this hunk also disappears > futures::future::ok(UploadStats { > chunk_count, > diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs > index 83c75ba28..7dade3f07 100644 > --- a/pbs-client/src/chunk_stream.rs > +++ b/pbs-client/src/chunk_stream.rs > @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks; > /// Holds the queues for optional injection of reused dynamic index entries > pub struct InjectionData { > boundaries: mpsc::Receiver, > + next_boundary: Option, > injections: mpsc::Sender, > consumed: u64, > } > @@ -25,6 +26,7 @@ impl InjectionData { > ) -> Self { > Self { > boundaries, > + next_boundary: None, > injections, > consumed: 0, > } > @@ -37,15 +39,17 @@ pub struct ChunkStream { > chunker: Chunker, > buffer: BytesMut, > scan_pos: usize, > + injection_data: Option, > } > > impl ChunkStream { > - pub fn new(input: S, chunk_size: Option) -> Self { > + pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { > Self { > input, > chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), > buffer: BytesMut::new(), > scan_pos: 0, > + injection_data, > } > } > } > @@ -62,19 +66,84 @@ where > > fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { > let this = self.get_mut(); > + > loop { > + if let Some(InjectionData { > + boundaries, > + next_boundary, > + injections, > + consumed, > + }) = this.injection_data.as_mut() > + { > + if next_boundary.is_none() { > + if let Ok(boundary) = boundaries.try_recv() { > + *next_boundary = Some(boundary); > + } > + } > + > + if let Some(inject) = next_boundary.take() { > + // require forced boundary, lookup next regular boundary > + let pos = this.chunker.scan(&this.buffer[this.scan_pos..]); below we check explicitely if scan_pos < buffer.len(), wouldn't that make sense here too? if i'm reading the code right, it can never be > than buffer.len() so it doesn't matter i think but for the case they're equal it would be a bit of optimization > + > + let chunk_boundary = if pos == 0 { > + *consumed + this.buffer.len() as u64 > + } else { > + *consumed + (this.scan_pos + pos) as u64 > + }; > + > + if inject.boundary <= chunk_boundary { > + // forced boundary is before next boundary, force within current buffer > + let chunk_size = (inject.boundary - *consumed) as usize; > + let raw_chunk = this.buffer.split_to(chunk_size); > + this.chunker.reset(); > + this.scan_pos = 0; > + > + *consumed += chunk_size as u64; > + > + // add the size of the injected chunks to consumed, so chunk stream offsets > + // are in sync with the rest of the archive. > + *consumed += inject.size as u64; > + > + injections.send(inject).unwrap(); > + > + // the chunk can be empty, return nevertheless to allow the caller to > + // make progress by consuming from the injection queue > + return Poll::Ready(Some(Ok(raw_chunk))); > + } else if pos != 0 { > + *next_boundary = Some(inject); > + // forced boundary is after next boundary, split off chunk from buffer > + let chunk_size = this.scan_pos + pos; > + let raw_chunk = this.buffer.split_to(chunk_size); > + *consumed += chunk_size as u64; > + this.scan_pos = 0; > + > + return Poll::Ready(Some(Ok(raw_chunk))); > + } else { > + // forced boundary is after current buffer length, continue reading > + *next_boundary = Some(inject); > + this.scan_pos = this.buffer.len(); > + } these^ two else if/else branches were meant at the beginning of my message after 57/65 these are practically identical to the ones below in the "regular" chunking code and when we move the 'let boundary = this.chunker...' above, we can reuse that result for both cases > + } > + } > + > if this.scan_pos < this.buffer.len() { > + // look for next chunk boundary, starting from scan_pos > let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); > > let chunk_size = this.scan_pos + boundary; > > if boundary == 0 { > + // no new chunk boundary, update position for next boundary lookup > this.scan_pos = this.buffer.len(); > - // continue poll > } else if chunk_size <= this.buffer.len() { > - let result = this.buffer.split_to(chunk_size); > + // found new chunk boundary inside buffer, split off chunk from buffer > + let raw_chunk = this.buffer.split_to(chunk_size); > + if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() { > + *consumed += chunk_size as u64; > + } > this.scan_pos = 0; > - return Poll::Ready(Some(Ok(result))); > + > + return Poll::Ready(Some(Ok(raw_chunk))); why the extra newline? > } else { > panic!("got unexpected chunk boundary from chunker"); > } > @@ -82,10 +151,11 @@ where > > match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { > Some(Err(err)) => { > + // got error in byte stream, pass to consumer > return Poll::Ready(Some(Err(err.into()))); > } > None => { > - this.scan_pos = 0; is there any special reason why you're removing this ? i mean it probably does not make a difference since we're at the end of the stream anyway? > + // end of stream reached, flush remaining bytes in buffer > if !this.buffer.is_empty() { > return Poll::Ready(Some(Ok(this.buffer.split()))); > } else { > @@ -93,6 +163,7 @@ where > } > } > Some(Ok(data)) => { > + // got new data, add to buffer > this.buffer.extend_from_slice(data.as_ref()); > } > } > diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs > index 0f32efcce..dd3c64525 100644 > --- a/pbs-client/src/pxar/create.rs > +++ b/pbs-client/src/pxar/create.rs > @@ -6,7 +6,7 @@ use std::ops::Range; > use std::os::unix::ffi::OsStrExt; > use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; > use std::path::{Path, PathBuf}; > -use std::sync::{Arc, Mutex}; > +use std::sync::{mpsc, Arc, Mutex}; > > use anyhow::{bail, Context, Error}; > use futures::future::BoxFuture; > @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter; > use pbs_datastore::dynamic_index::DynamicIndexReader; > use pbs_datastore::index::IndexFile; > > +use crate::inject_reused_chunks::InjectChunks; > use crate::pxar::metadata::errno_is_unsupported; > use crate::pxar::tools::assert_single_path_component; > use crate::pxar::Flags; > @@ -134,6 +135,7 @@ struct Archiver { > hardlinks: HashMap, > file_copy_buffer: Vec, > skip_e2big_xattr: bool, > + forced_boundaries: Option>, > } > > type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>; > @@ -164,6 +166,7 @@ pub async fn create_archive( > feature_flags: Flags, > callback: F, > options: PxarCreateOptions, > + forced_boundaries: Option>, > ) -> Result<(), Error> > where > T: SeqWrite + Send, > @@ -224,6 +227,7 @@ where > hardlinks: HashMap::new(), > file_copy_buffer: vec::undefined(4 * 1024 * 1024), > skip_e2big_xattr: options.skip_e2big_xattr, > + forced_boundaries, > }; > > archiver > diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs > index 95145cb0d..9d2cb41d6 100644 > --- a/pbs-client/src/pxar_backup_stream.rs > +++ b/pbs-client/src/pxar_backup_stream.rs > @@ -2,7 +2,7 @@ use std::io::Write; > //use std::os::unix::io::FromRawFd; > use std::path::Path; > use std::pin::Pin; > -use std::sync::{Arc, Mutex}; > +use std::sync::{mpsc, Arc, Mutex}; > use std::task::{Context, Poll}; > > use anyhow::{format_err, Error}; > @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter; > > use pbs_datastore::catalog::CatalogWriter; > > +use crate::inject_reused_chunks::InjectChunks; > use crate::pxar::create::PxarWriters; > > /// Stream implementation to encode and upload .pxar archives. > @@ -42,6 +43,7 @@ impl PxarBackupStream { > dir: Dir, > catalog: Arc>>, > options: crate::pxar::PxarCreateOptions, > + boundaries: Option>, > separate_payload_stream: bool, > ) -> Result<(Self, Option), Error> { > let buffer_size = 256 * 1024; > @@ -79,6 +81,7 @@ impl PxarBackupStream { > Ok(()) > }, > options, > + boundaries, > ) > .await > { > @@ -110,11 +113,12 @@ impl PxarBackupStream { > dirname: &Path, > catalog: Arc>>, > options: crate::pxar::PxarCreateOptions, > + boundaries: Option>, > separate_payload_stream: bool, > ) -> Result<(Self, Option), Error> { > let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?; > > - Self::new(dir, catalog, options, separate_payload_stream) > + Self::new(dir, catalog, options, boundaries, separate_payload_stream) > } > } > > diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs > index 821777d66..5e93f9542 100644 > --- a/proxmox-backup-client/src/main.rs > +++ b/proxmox-backup-client/src/main.rs > @@ -45,8 +45,8 @@ use pbs_client::tools::{ > use pbs_client::{ > delete_ticket_info, parse_backup_specification, view_task_result, BackupReader, > BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream, > - FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions, > - BACKUP_SOURCE_SCHEMA, > + FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader, > + UploadOptions, BACKUP_SOURCE_SCHEMA, > }; > use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter}; > use pbs_datastore::chunk_store::verify_chunk_size; > @@ -199,14 +199,16 @@ async fn backup_directory>( > bail!("cannot backup directory with fixed chunk size!"); > } > > + let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel(); > let (pxar_stream, payload_stream) = PxarBackupStream::open( > dir_path.as_ref(), > catalog, > pxar_create_options, > + Some(payload_boundaries_tx), > payload_target.is_some(), > )?; > > - let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size); > + let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None); > let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks > > let stream = ReceiverStream::new(rx).map_err(Error::from); > @@ -218,13 +220,16 @@ async fn backup_directory>( > } > }); > > - let stats = client.upload_stream(archive_name, stream, upload_options.clone()); > + let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None); > > if let Some(payload_stream) = payload_stream { > let payload_target = payload_target > .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?; > > - let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size); > + let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel(); > + let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx); > + let mut payload_chunk_stream = > + ChunkStream::new(payload_stream, chunk_size, Some(injection_data)); > let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks > let stream = ReceiverStream::new(payload_rx).map_err(Error::from); > > @@ -235,7 +240,12 @@ async fn backup_directory>( > } > }); > > - let payload_stats = client.upload_stream(&payload_target, stream, upload_options); > + let payload_stats = client.upload_stream( > + &payload_target, > + stream, > + upload_options, > + Some(payload_injections_rx), > + ); > > match futures::join!(stats, payload_stats) { > (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))), > @@ -271,7 +281,7 @@ async fn backup_image>( > } > > let stats = client > - .upload_stream(archive_name, stream, upload_options) > + .upload_stream(archive_name, stream, upload_options, None) > .await?; > > Ok(stats) > @@ -562,7 +572,7 @@ fn spawn_catalog_upload( > let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes > let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx); > let catalog_chunk_size = 512 * 1024; > - let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size)); > + let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None); > > let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new( > StdChannelWriter::new(catalog_tx), > @@ -578,7 +588,7 @@ fn spawn_catalog_upload( > > tokio::spawn(async move { > let catalog_upload_result = client > - .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options) > + .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None) > .await; > > if let Err(ref err) = catalog_upload_result { > diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > index ea97976e6..0883d6cda 100644 > --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs > @@ -364,6 +364,7 @@ fn extract( > Flags::DEFAULT, > |_| Ok(()), > options, > + None, > ) > .await > } > diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs > index 58c9d2cfd..d46c98d2b 100644 > --- a/pxar-bin/src/main.rs > +++ b/pxar-bin/src/main.rs > @@ -405,6 +405,7 @@ async fn create_archive( > Ok(()) > }, > options, > + None, > ) > .await?; > > diff --git a/tests/catar.rs b/tests/catar.rs > index 9e96a8610..d5ef85ffe 100644 > --- a/tests/catar.rs > +++ b/tests/catar.rs > @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> { > Flags::DEFAULT, > |_| Ok(()), > options, > + None, > ))?; > > Command::new("cmp") _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel