all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Christian Ebner <c.ebner@proxmox.com>
Subject: Re: [pbs-devel] [PATCH v6 proxmox-backup 42/65] client: streams: add channels for dynamic entry injection
Date: Wed, 22 May 2024 11:56:18 +0200	[thread overview]
Message-ID: <7a285e31-7483-4840-8b50-f5655fc97a64@proxmox.com> (raw)
In-Reply-To: <20240514103421.289431-43-c.ebner@proxmox.com>

high level:

i think this patch would be much cleaner if the 57/65
datastore:chunker: add Chunker trait

would come before this, because you change stuff around only to change it again
in a way that could be written better if those patches were

reversed, e.g.

in case there was a next_boundary in the ChunkStream, in the final code
there is the same code from the regulary chunking duplicated

(this.buffer.split_to;this.consumed +=;this.scan_pos)

and could be trivially handled by the same code part
(with a bit of reordering)

but the patches in this order make it hard to see since
here the paths are different enough so they can't be shared

some other comments inline:

On 5/14/24 12:33, Christian Ebner wrote:
> To reuse dynamic entries of a previous backup run and index them for
> the new snapshot. Adds a non-blocking channel between the pxar
> archiver and the chunk stream, as well as the chunk stream and the
> backup writer.
> 
> The archiver sends forced boundary positions and the dynamic
> entries to inject into the chunk stream following this boundary.
> 
> The chunk stream consumes this channel inputs as receiver whenever a
> new chunk is requested by the upload stream, forcing a non-regular
> chunk boundary in the pxar stream at the requested positions.
> 
> The dynamic entries to inject and the boundary are then send via the
> second asynchronous channel to the backup writer's upload stream,
> indexing them by inserting the dynamic entries as known chunks into
> the upload stream.
> 
> Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
> ---
>   examples/test_chunk_speed2.rs                 |   2 +-
>   pbs-client/src/backup_writer.rs               | 110 ++++++++++++------
>   pbs-client/src/chunk_stream.rs                |  81 ++++++++++++-
>   pbs-client/src/pxar/create.rs                 |   6 +-
>   pbs-client/src/pxar_backup_stream.rs          |   8 +-
>   proxmox-backup-client/src/main.rs             |  28 +++--
>   .../src/proxmox_restore_daemon/api.rs         |   1 +
>   pxar-bin/src/main.rs                          |   1 +
>   tests/catar.rs                                |   1 +
>   9 files changed, 183 insertions(+), 55 deletions(-)
> 
> diff --git a/examples/test_chunk_speed2.rs b/examples/test_chunk_speed2.rs
> index 3f69b436d..22dd14ce2 100644
> --- a/examples/test_chunk_speed2.rs
> +++ b/examples/test_chunk_speed2.rs
> @@ -26,7 +26,7 @@ async fn run() -> Result<(), Error> {
>           .map_err(Error::from);
>   
>       //let chunk_stream = FixedChunkStream::new(stream, 4*1024*1024);
> -    let mut chunk_stream = ChunkStream::new(stream, None);
> +    let mut chunk_stream = ChunkStream::new(stream, None, None);
>   
>       let start_time = std::time::Instant::now();
>   
> diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
> index dc9aa569f..66f209fed 100644
> --- a/pbs-client/src/backup_writer.rs
> +++ b/pbs-client/src/backup_writer.rs
> @@ -23,6 +23,7 @@ use pbs_tools::crypt_config::CryptConfig;
>   
>   use proxmox_human_byte::HumanByte;
>   
> +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo};
>   use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo};
>   
>   use super::{H2Client, HttpClient};
> @@ -265,6 +266,7 @@ impl BackupWriter {
>           archive_name: &str,
>           stream: impl Stream<Item = Result<bytes::BytesMut, Error>>,
>           options: UploadOptions,
> +        injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>       ) -> Result<BackupStats, Error> {
>           let known_chunks = Arc::new(Mutex::new(HashSet::new()));
>   
> @@ -341,6 +343,7 @@ impl BackupWriter {
>                   None
>               },
>               options.compress,
> +            injections,
>           )
>           .await?;
>   
> @@ -636,6 +639,7 @@ impl BackupWriter {
>           known_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>           crypt_config: Option<Arc<CryptConfig>>,
>           compress: bool,
> +        injections: Option<std::sync::mpsc::Receiver<InjectChunks>>,
>       ) -> impl Future<Output = Result<UploadStats, Error>> {
>           let total_chunks = Arc::new(AtomicUsize::new(0));
>           let total_chunks2 = total_chunks.clone();
> @@ -643,10 +647,12 @@ impl BackupWriter {
>           let known_chunk_count2 = known_chunk_count.clone();
>   
>           let stream_len = Arc::new(AtomicUsize::new(0));
> +        let stream_len1 = stream_len.clone();
>           let stream_len2 = stream_len.clone();
>           let compressed_stream_len = Arc::new(AtomicU64::new(0));
>           let compressed_stream_len2 = compressed_stream_len.clone();
>           let reused_len = Arc::new(AtomicUsize::new(0));
> +        let reused_len1 = reused_len.clone();

both of these variables are not really necessary?
in the closure you could  simply keep using 'stream_len'
and in the .inject_reused_chunks call you can clone it inline

the reused_len1 is used in the same closure as reused_len so
there you can simply use that instead for both updates

>           let reused_len2 = reused_len.clone();
>   
>           let append_chunk_path = format!("{}_index", prefix);
> @@ -658,52 +664,79 @@ impl BackupWriter {
>   
>           let start_time = std::time::Instant::now();
>   
> -        let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
> +        let index_csum = Arc::new(Mutex::new(openssl::sha::Sha256::new()));

why do you change this?
you can simply use the

let mut guard = ..;
let csum = ..;

pattern everywhere you need?


> +        let index_csum_1 = index_csum.clone();
>           let index_csum_2 = index_csum.clone();
>   
>           stream
> -            .and_then(move |data| {
> -                let chunk_len = data.len();
> +            .inject_reused_chunks(injections, stream_len)
> +            .and_then(move |chunk_info| match chunk_info {
> +                InjectedChunksInfo::Known(chunks) => {
> +                    // account for injected chunks
> +                    let count = chunks.len();
> +                    total_chunks.fetch_add(count, Ordering::SeqCst);
> +
> +                    let mut known = Vec::new();
> +                    let mut csum = index_csum_1.lock().unwrap();
> +                    for chunk in chunks {
> +                        let offset =
> +                            stream_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst) as u64;
> +                        reused_len1.fetch_add(chunk.size() as usize, Ordering::SeqCst);
> +                        let digest = chunk.digest();
> +                        known.push((offset, digest));
> +                        let end_offset = offset + chunk.size();
> +                        csum.update(&end_offset.to_le_bytes());
> +                        csum.update(&digest);
> +                    }
> +                    future::ok(MergedChunkInfo::Known(known))
> +                }
> +                InjectedChunksInfo::Raw(raw) => {

if you rename 'raw' to data and...

> +                    // account for not injected chunks (new and known)
> +                    let offset = stream_len1.fetch_add(raw.len(), Ordering::SeqCst) as u64;
> +                    let chunk_len = raw.len() as u64;

leave chunk_len as usize and..

>   
> -                total_chunks.fetch_add(1, Ordering::SeqCst);
> -                let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64;
> +                    total_chunks.fetch_add(1, Ordering::SeqCst);

reorder these statements a bit
and don't add some unnecssary newlines

these hunk get much more readable with 'git diff -w'
namely it completely dissapears :P since all you do here is basically
indenting the code one level


but that was really hard to see with the (IMHO not necessary) changes to the code

>   
> -                let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress);
> +                    let mut chunk_builder = DataChunkBuilder::new(raw.as_ref()).compress(compress);
>   
> -                if let Some(ref crypt_config) = crypt_config {
> -                    chunk_builder = chunk_builder.crypt_config(crypt_config);
> -                }
> +                    if let Some(ref crypt_config) = crypt_config {
> +                        chunk_builder = chunk_builder.crypt_config(crypt_config);
> +                    }
>   
> -                let mut known_chunks = known_chunks.lock().unwrap();
> -                let digest = chunk_builder.digest();
> +                    let mut known_chunks = known_chunks.lock().unwrap();
>   
> -                let mut guard = index_csum.lock().unwrap();
> -                let csum = guard.as_mut().unwrap();
> +                    let digest = chunk_builder.digest();
>   
> -                let chunk_end = offset + chunk_len as u64;
> +                    let mut csum = index_csum.lock().unwrap();
>   
> -                if !is_fixed_chunk_size {
> -                    csum.update(&chunk_end.to_le_bytes());
> -                }
> -                csum.update(digest);
> +                    let chunk_end = offset + chunk_len;
>   
> -                let chunk_is_known = known_chunks.contains(digest);
> -                if chunk_is_known {
> -                    known_chunk_count.fetch_add(1, Ordering::SeqCst);
> -                    reused_len.fetch_add(chunk_len, Ordering::SeqCst);
> -                    future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> -                } else {
> -                    let compressed_stream_len2 = compressed_stream_len.clone();
> -                    known_chunks.insert(*digest);
> -                    future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> -                        compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> -                        MergedChunkInfo::New(ChunkInfo {
> -                            chunk,
> -                            digest,
> -                            chunk_len: chunk_len as u64,
> -                            offset,
> -                        })
> -                    }))
> +                    if !is_fixed_chunk_size {
> +                        csum.update(&chunk_end.to_le_bytes());
> +                    }
> +                    csum.update(digest);
> +
> +                    let chunk_is_known = known_chunks.contains(digest);
> +                    if chunk_is_known {
> +                        known_chunk_count.fetch_add(1, Ordering::SeqCst);
> +                        reused_len.fetch_add(chunk_len as usize, Ordering::SeqCst);
> +
> +                        future::ok(MergedChunkInfo::Known(vec![(offset, *digest)]))
> +                    } else {
> +                        let compressed_stream_len2 = compressed_stream_len.clone();
> +                        known_chunks.insert(*digest);
> +
> +                        future::ready(chunk_builder.build().map(move |(chunk, digest)| {
> +                            compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst);
> +
> +                            MergedChunkInfo::New(ChunkInfo {
> +                                chunk,
> +                                digest,
> +                                chunk_len,
> +                                offset,
> +                            })
> +                        }))
> +                    }
>                   }
>               })
>               .merge_known_chunks()
> @@ -771,8 +804,11 @@ impl BackupWriter {
>                   let size_reused = reused_len2.load(Ordering::SeqCst);
>                   let size_compressed = compressed_stream_len2.load(Ordering::SeqCst) as usize;
>   
> -                let mut guard = index_csum_2.lock().unwrap();
> -                let csum = guard.take().unwrap().finish();
> +                let csum = Arc::into_inner(index_csum_2)
> +                    .unwrap()
> +                    .into_inner()
> +                    .unwrap()
> +                    .finish();
>   

if you don't change the type above, this hunk also disappears

>                   futures::future::ok(UploadStats {
>                       chunk_count,
> diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs
> index 83c75ba28..7dade3f07 100644
> --- a/pbs-client/src/chunk_stream.rs
> +++ b/pbs-client/src/chunk_stream.rs
> @@ -14,6 +14,7 @@ use crate::inject_reused_chunks::InjectChunks;
>   /// Holds the queues for optional injection of reused dynamic index entries
>   pub struct InjectionData {
>       boundaries: mpsc::Receiver<InjectChunks>,
> +    next_boundary: Option<InjectChunks>,
>       injections: mpsc::Sender<InjectChunks>,
>       consumed: u64,
>   }
> @@ -25,6 +26,7 @@ impl InjectionData {
>       ) -> Self {
>           Self {
>               boundaries,
> +            next_boundary: None,
>               injections,
>               consumed: 0,
>           }
> @@ -37,15 +39,17 @@ pub struct ChunkStream<S: Unpin> {
>       chunker: Chunker,
>       buffer: BytesMut,
>       scan_pos: usize,
> +    injection_data: Option<InjectionData>,
>   }
>   
>   impl<S: Unpin> ChunkStream<S> {
> -    pub fn new(input: S, chunk_size: Option<usize>) -> Self {
> +    pub fn new(input: S, chunk_size: Option<usize>, injection_data: Option<InjectionData>) -> Self {
>           Self {
>               input,
>               chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)),
>               buffer: BytesMut::new(),
>               scan_pos: 0,
> +            injection_data,
>           }
>       }
>   }
> @@ -62,19 +66,84 @@ where
>   
>       fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
>           let this = self.get_mut();
> +
>           loop {
> +            if let Some(InjectionData {
> +                boundaries,
> +                next_boundary,
> +                injections,
> +                consumed,
> +            }) = this.injection_data.as_mut()
> +            {
> +                if next_boundary.is_none() {
> +                    if let Ok(boundary) = boundaries.try_recv() {
> +                        *next_boundary = Some(boundary);
> +                    }
> +                }
> +
> +                if let Some(inject) = next_boundary.take() {
> +                    // require forced boundary, lookup next regular boundary
> +                    let pos = this.chunker.scan(&this.buffer[this.scan_pos..]);

below we check explicitely if scan_pos < buffer.len(), wouldn't that make sense here too?
if i'm reading the code right, it can never be > than buffer.len() so it doesn't matter i think
but for the case they're equal it would be a bit of optimization

> +
> +                    let chunk_boundary = if pos == 0 {
> +                        *consumed + this.buffer.len() as u64
> +                    } else {
> +                        *consumed + (this.scan_pos + pos) as u64
> +                    };
> +
> +                    if inject.boundary <= chunk_boundary {
> +                        // forced boundary is before next boundary, force within current buffer
> +                        let chunk_size = (inject.boundary - *consumed) as usize;
> +                        let raw_chunk = this.buffer.split_to(chunk_size);
> +                        this.chunker.reset();
> +                        this.scan_pos = 0;
> +
> +                        *consumed += chunk_size as u64;
> +
> +                        // add the size of the injected chunks to consumed, so chunk stream offsets
> +                        // are in sync with the rest of the archive.
> +                        *consumed += inject.size as u64;
> +
> +                        injections.send(inject).unwrap();
> +
> +                        // the chunk can be empty, return nevertheless to allow the caller to
> +                        // make progress by consuming from the injection queue
> +                        return Poll::Ready(Some(Ok(raw_chunk)));
> +                    } else if pos != 0 {
> +                        *next_boundary = Some(inject);
> +                        // forced boundary is after next boundary, split off chunk from buffer
> +                        let chunk_size = this.scan_pos + pos;
> +                        let raw_chunk = this.buffer.split_to(chunk_size);
> +                        *consumed += chunk_size as u64;
> +                        this.scan_pos = 0;
> +
> +                        return Poll::Ready(Some(Ok(raw_chunk)));
> +                    } else {
> +                        // forced boundary is after current buffer length, continue reading
> +                        *next_boundary = Some(inject);
> +                        this.scan_pos = this.buffer.len();
> +                    }

these^ two else if/else branches were meant at the beginning of my message
after 57/65 these are practically identical
to the ones below in the "regular" chunking code

and when we move the 'let boundary = this.chunker...' above, we can reuse that result
for both cases


> +                }
> +            }
> +
>               if this.scan_pos < this.buffer.len() {
> +                // look for next chunk boundary, starting from scan_pos
>                   let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]);
>   
>                   let chunk_size = this.scan_pos + boundary;
>   
>                   if boundary == 0 {
> +                    // no new chunk boundary, update position for next boundary lookup
>                       this.scan_pos = this.buffer.len();
> -                    // continue poll
>                   } else if chunk_size <= this.buffer.len() {
> -                    let result = this.buffer.split_to(chunk_size);
> +                    // found new chunk boundary inside buffer, split off chunk from buffer
> +                    let raw_chunk = this.buffer.split_to(chunk_size);
> +                    if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() {
> +                        *consumed += chunk_size as u64;
> +                    }
>                       this.scan_pos = 0;
> -                    return Poll::Ready(Some(Ok(result)));
> +
> +                    return Poll::Ready(Some(Ok(raw_chunk)));

why the extra newline?

>                   } else {
>                       panic!("got unexpected chunk boundary from chunker");
>                   }
> @@ -82,10 +151,11 @@ where
>   
>               match ready!(Pin::new(&mut this.input).try_poll_next(cx)) {
>                   Some(Err(err)) => {
> +                    // got error in byte stream, pass to consumer
>                       return Poll::Ready(Some(Err(err.into())));
>                   }
>                   None => {
> -                    this.scan_pos = 0;

is there any special reason why you're removing this ?
i mean it probably does not make a difference since we're at the end of the stream anyway?

> +                    // end of stream reached, flush remaining bytes in buffer
>                       if !this.buffer.is_empty() {
>                           return Poll::Ready(Some(Ok(this.buffer.split())));
>                       } else {
> @@ -93,6 +163,7 @@ where
>                       }
>                   }
>                   Some(Ok(data)) => {
> +                    // got new data, add to buffer
>                       this.buffer.extend_from_slice(data.as_ref());
>                   }
>               }
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 0f32efcce..dd3c64525 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -6,7 +6,7 @@ use std::ops::Range;
>   use std::os::unix::ffi::OsStrExt;
>   use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
>   use std::path::{Path, PathBuf};
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
>   
>   use anyhow::{bail, Context, Error};
>   use futures::future::BoxFuture;
> @@ -29,6 +29,7 @@ use pbs_datastore::catalog::BackupCatalogWriter;
>   use pbs_datastore::dynamic_index::DynamicIndexReader;
>   use pbs_datastore::index::IndexFile;
>   
> +use crate::inject_reused_chunks::InjectChunks;
>   use crate::pxar::metadata::errno_is_unsupported;
>   use crate::pxar::tools::assert_single_path_component;
>   use crate::pxar::Flags;
> @@ -134,6 +135,7 @@ struct Archiver {
>       hardlinks: HashMap<HardLinkInfo, (PathBuf, LinkOffset)>,
>       file_copy_buffer: Vec<u8>,
>       skip_e2big_xattr: bool,
> +    forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>   }
>   
>   type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -164,6 +166,7 @@ pub async fn create_archive<T, F>(
>       feature_flags: Flags,
>       callback: F,
>       options: PxarCreateOptions,
> +    forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
>   ) -> Result<(), Error>
>   where
>       T: SeqWrite + Send,
> @@ -224,6 +227,7 @@ where
>           hardlinks: HashMap::new(),
>           file_copy_buffer: vec::undefined(4 * 1024 * 1024),
>           skip_e2big_xattr: options.skip_e2big_xattr,
> +        forced_boundaries,
>       };
>   
>       archiver
> diff --git a/pbs-client/src/pxar_backup_stream.rs b/pbs-client/src/pxar_backup_stream.rs
> index 95145cb0d..9d2cb41d6 100644
> --- a/pbs-client/src/pxar_backup_stream.rs
> +++ b/pbs-client/src/pxar_backup_stream.rs
> @@ -2,7 +2,7 @@ use std::io::Write;
>   //use std::os::unix::io::FromRawFd;
>   use std::path::Path;
>   use std::pin::Pin;
> -use std::sync::{Arc, Mutex};
> +use std::sync::{mpsc, Arc, Mutex};
>   use std::task::{Context, Poll};
>   
>   use anyhow::{format_err, Error};
> @@ -17,6 +17,7 @@ use proxmox_io::StdChannelWriter;
>   
>   use pbs_datastore::catalog::CatalogWriter;
>   
> +use crate::inject_reused_chunks::InjectChunks;
>   use crate::pxar::create::PxarWriters;
>   
>   /// Stream implementation to encode and upload .pxar archives.
> @@ -42,6 +43,7 @@ impl PxarBackupStream {
>           dir: Dir,
>           catalog: Arc<Mutex<CatalogWriter<W>>>,
>           options: crate::pxar::PxarCreateOptions,
> +        boundaries: Option<mpsc::Sender<InjectChunks>>,
>           separate_payload_stream: bool,
>       ) -> Result<(Self, Option<Self>), Error> {
>           let buffer_size = 256 * 1024;
> @@ -79,6 +81,7 @@ impl PxarBackupStream {
>                       Ok(())
>                   },
>                   options,
> +                boundaries,
>               )
>               .await
>               {
> @@ -110,11 +113,12 @@ impl PxarBackupStream {
>           dirname: &Path,
>           catalog: Arc<Mutex<CatalogWriter<W>>>,
>           options: crate::pxar::PxarCreateOptions,
> +        boundaries: Option<mpsc::Sender<InjectChunks>>,
>           separate_payload_stream: bool,
>       ) -> Result<(Self, Option<Self>), Error> {
>           let dir = nix::dir::Dir::open(dirname, OFlag::O_DIRECTORY, Mode::empty())?;
>   
> -        Self::new(dir, catalog, options, separate_payload_stream)
> +        Self::new(dir, catalog, options, boundaries, separate_payload_stream)
>       }
>   }
>   
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index 821777d66..5e93f9542 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -45,8 +45,8 @@ use pbs_client::tools::{
>   use pbs_client::{
>       delete_ticket_info, parse_backup_specification, view_task_result, BackupReader,
>       BackupRepository, BackupSpecificationType, BackupStats, BackupWriter, ChunkStream,
> -    FixedChunkStream, HttpClient, PxarBackupStream, RemoteChunkReader, UploadOptions,
> -    BACKUP_SOURCE_SCHEMA,
> +    FixedChunkStream, HttpClient, InjectionData, PxarBackupStream, RemoteChunkReader,
> +    UploadOptions, BACKUP_SOURCE_SCHEMA,
>   };
>   use pbs_datastore::catalog::{BackupCatalogWriter, CatalogReader, CatalogWriter};
>   use pbs_datastore::chunk_store::verify_chunk_size;
> @@ -199,14 +199,16 @@ async fn backup_directory<P: AsRef<Path>>(
>           bail!("cannot backup directory with fixed chunk size!");
>       }
>   
> +    let (payload_boundaries_tx, payload_boundaries_rx) = std::sync::mpsc::channel();
>       let (pxar_stream, payload_stream) = PxarBackupStream::open(
>           dir_path.as_ref(),
>           catalog,
>           pxar_create_options,
> +        Some(payload_boundaries_tx),
>           payload_target.is_some(),
>       )?;
>   
> -    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size);
> +    let mut chunk_stream = ChunkStream::new(pxar_stream, chunk_size, None);
>       let (tx, rx) = mpsc::channel(10); // allow to buffer 10 chunks
>   
>       let stream = ReceiverStream::new(rx).map_err(Error::from);
> @@ -218,13 +220,16 @@ async fn backup_directory<P: AsRef<Path>>(
>           }
>       });
>   
> -    let stats = client.upload_stream(archive_name, stream, upload_options.clone());
> +    let stats = client.upload_stream(archive_name, stream, upload_options.clone(), None);
>   
>       if let Some(payload_stream) = payload_stream {
>           let payload_target = payload_target
>               .ok_or_else(|| format_err!("got payload stream, but no target archive name"))?;
>   
> -        let mut payload_chunk_stream = ChunkStream::new(payload_stream, chunk_size);
> +        let (payload_injections_tx, payload_injections_rx) = std::sync::mpsc::channel();
> +        let injection_data = InjectionData::new(payload_boundaries_rx, payload_injections_tx);
> +        let mut payload_chunk_stream =
> +            ChunkStream::new(payload_stream, chunk_size, Some(injection_data));
>           let (payload_tx, payload_rx) = mpsc::channel(10); // allow to buffer 10 chunks
>           let stream = ReceiverStream::new(payload_rx).map_err(Error::from);
>   
> @@ -235,7 +240,12 @@ async fn backup_directory<P: AsRef<Path>>(
>               }
>           });
>   
> -        let payload_stats = client.upload_stream(&payload_target, stream, upload_options);
> +        let payload_stats = client.upload_stream(
> +            &payload_target,
> +            stream,
> +            upload_options,
> +            Some(payload_injections_rx),
> +        );
>   
>           match futures::join!(stats, payload_stats) {
>               (Ok(stats), Ok(payload_stats)) => Ok((stats, Some(payload_stats))),
> @@ -271,7 +281,7 @@ async fn backup_image<P: AsRef<Path>>(
>       }
>   
>       let stats = client
> -        .upload_stream(archive_name, stream, upload_options)
> +        .upload_stream(archive_name, stream, upload_options, None)
>           .await?;
>   
>       Ok(stats)
> @@ -562,7 +572,7 @@ fn spawn_catalog_upload(
>       let (catalog_tx, catalog_rx) = std::sync::mpsc::sync_channel(10); // allow to buffer 10 writes
>       let catalog_stream = proxmox_async::blocking::StdChannelStream(catalog_rx);
>       let catalog_chunk_size = 512 * 1024;
> -    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size));
> +    let catalog_chunk_stream = ChunkStream::new(catalog_stream, Some(catalog_chunk_size), None);
>   
>       let catalog_writer = Arc::new(Mutex::new(CatalogWriter::new(TokioWriterAdapter::new(
>           StdChannelWriter::new(catalog_tx),
> @@ -578,7 +588,7 @@ fn spawn_catalog_upload(
>   
>       tokio::spawn(async move {
>           let catalog_upload_result = client
> -            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options)
> +            .upload_stream(CATALOG_NAME, catalog_chunk_stream, upload_options, None)
>               .await;
>   
>           if let Err(ref err) = catalog_upload_result {
> diff --git a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> index ea97976e6..0883d6cda 100644
> --- a/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> +++ b/proxmox-restore-daemon/src/proxmox_restore_daemon/api.rs
> @@ -364,6 +364,7 @@ fn extract(
>                           Flags::DEFAULT,
>                           |_| Ok(()),
>                           options,
> +                        None,
>                       )
>                       .await
>                   }
> diff --git a/pxar-bin/src/main.rs b/pxar-bin/src/main.rs
> index 58c9d2cfd..d46c98d2b 100644
> --- a/pxar-bin/src/main.rs
> +++ b/pxar-bin/src/main.rs
> @@ -405,6 +405,7 @@ async fn create_archive(
>               Ok(())
>           },
>           options,
> +        None,
>       )
>       .await?;
>   
> diff --git a/tests/catar.rs b/tests/catar.rs
> index 9e96a8610..d5ef85ffe 100644
> --- a/tests/catar.rs
> +++ b/tests/catar.rs
> @@ -39,6 +39,7 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
>           Flags::DEFAULT,
>           |_| Ok(()),
>           options,
> +        None,
>       ))?;
>   
>       Command::new("cmp")



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  reply	other threads:[~2024-05-22  9:56 UTC|newest]

Thread overview: 98+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-05-14 10:33 [pbs-devel] [PATCH v5 pxar proxmox-backup 00/62] fix #3174: improve file-level backup Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 01/14] format/examples: add header type `PXAR_PAYLOAD_REF` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 02/14] decoder: add method to read payload references Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 03/14] decoder: factor out skip part from skip_entry Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 04/14] encoder: add optional output writer for file payloads Christian Ebner
2024-05-21 10:06   ` Dominik Csapak
2024-05-21 10:21     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 05/14] encoder: move to stack based state tracking Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 06/14] decoder/accessor: add optional payload input stream Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 07/14] decoder: set payload input range when decoding via accessor Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 08/14] encoder: add payload reference capability Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 09/14] encoder: add payload position capability Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 10/14] encoder: add payload advance capability Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 11/14] encoder/format: finish payload stream with marker Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 12/14] format: add payload stream start marker Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 13/14] format/encoder/decoder: new pxar entry type `Version` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 pxar 14/14] format/encoder/decoder: new pxar entry type `Prelude` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 15/65] client: pxar: switch to stack based encoder state Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 16/65] client: backup: factor out extension from backup target Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 17/65] client: pxar: combine writers into struct Christian Ebner
2024-05-21 10:29   ` Dominik Csapak
2024-05-21 13:30     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 18/65] client: pxar: add optional pxar payload writer instance Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 19/65] client: pxar: optionally split metadata and payload streams Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 20/65] client: helper: add helpers for creating reader instances Christian Ebner
2024-05-21 12:26   ` Dominik Csapak
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 21/65] client: helper: add method for split archive name mapping Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 22/65] client: restore: read payload from dedicated index Christian Ebner
2024-05-21 12:44   ` Dominik Csapak
2024-05-24  6:45     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 23/65] tools: cover extension for split pxar archives Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 24/65] restore: " Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 25/65] client: mount: make split pxar archives mountable Christian Ebner
2024-05-21 12:54   ` Dominik Csapak
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 26/65] api: datastore: refactor getting local chunk reader Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 27/65] api: datastore: attach optional payload " Christian Ebner
2024-05-21 13:12   ` Dominik Csapak
2024-05-24  6:48     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 28/65] catalog: shell: make split pxar archives accessible Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 29/65] www: cover metadata extension for pxar archives Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 30/65] file restore: factor out getting pxar reader Christian Ebner
2024-05-21 13:19   ` Dominik Csapak
2024-05-21 14:07     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 31/65] file restore: cover split metadata and payload archives Christian Ebner
2024-05-21 13:25   ` Dominik Csapak
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 32/65] file restore: show more error context when extraction fails Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 33/65] pxar: add optional payload input for achive restore Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 34/65] pxar: add more context to extraction error Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 35/65] client: pxar: include payload offset in entry listing Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 36/65] pxar: show padding in debug output on archive list Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 37/65] datastore: dynamic index: add method to get digest Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 38/65] client: pxar: helper for lookup of reusable dynamic entries Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 39/65] upload stream: implement reused chunk injector Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 40/65] client: chunk stream: add struct to hold injection state Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 41/65] chunker: add method to reset chunker state Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 42/65] client: streams: add channels for dynamic entry injection Christian Ebner
2024-05-22  9:56   ` Dominik Csapak [this message]
2024-05-24  6:57     ` Christian Ebner
2024-05-14 10:33 ` [pbs-devel] [PATCH v6 proxmox-backup 43/65] specs: add backup detection mode specification Christian Ebner
2024-05-22 13:07   ` Dominik Csapak
2024-05-24  6:59     ` Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 44/65] client: implement prepare reference method Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 45/65] client: pxar: add method for metadata comparison Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 46/65] pxar: caching: add look-ahead cache types Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 47/65] fix #3174: client: pxar: enable caching and meta comparison Christian Ebner
2024-05-22 14:45   ` Dominik Csapak
2024-05-24  8:50     ` Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 48/65] client: backup writer: add injected chunk count to stats Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 49/65] pxar: create: keep track of reused chunks and files Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 50/65] pxar: create: show chunk injection stats debug output Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 51/65] client: pxar: add helper to handle optional preludes Christian Ebner
2024-05-23  8:47   ` Dominik Csapak
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 52/65] client: pxar: opt encode cli exclude patterns as Prelude Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 53/65] docs: file formats: describe split pxar archive file layout Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 54/65] docs: add section describing change detection mode Christian Ebner
2024-05-23  9:28   ` Dominik Csapak
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 55/65] test-suite: add detection mode change benchmark Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 56/65] test-suite: add bin to deb, add shell completions Christian Ebner
2024-05-23  9:32   ` Dominik Csapak
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 57/65] datastore: chunker: add Chunker trait Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 58/65] datastore: chunker: implement chunker for payload stream Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 59/65] client: chunk stream: switch payload stream chunker Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 60/65] client: pxar: allow to restore prelude to optional path Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 61/65] client: pxar: add archive creation with reference test Christian Ebner
2024-05-23 10:04   ` Dominik Csapak
2024-05-23 10:17     ` Christian Ebner
2024-05-23 10:17       ` Dominik Csapak
2024-05-27 11:05         ` Christian Ebner
2024-05-27 11:17           ` Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 62/65] client: tools: add helper to raise nofile rlimit Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 63/65] client: pxar: set cache limit based on " Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 64/65] chunker: tests: add regression tests for payload chunker Christian Ebner
2024-05-21 11:23   ` Dominik Csapak
2024-05-21 11:27     ` Christian Ebner
2024-05-14 10:34 ` [pbs-devel] [PATCH v6 proxmox-backup 65/65] chunk stream: " Christian Ebner
2024-05-21 11:21   ` Dominik Csapak
2024-05-14 10:45 ` [pbs-devel] [PATCH v5 pxar proxmox-backup 00/62] fix #3174: improve file-level backup Christian Ebner
2024-05-27 14:35 ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=7a285e31-7483-4840-8b50-f5655fc97a64@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal