From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 2DBE61FF399 for ; Wed, 5 Jun 2024 12:55:10 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 95E04320C5; Wed, 5 Jun 2024 12:55:31 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 5 Jun 2024 12:54:02 +0200 Message-Id: <20240605105416.278748-45-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240605105416.278748-1-c.ebner@proxmox.com> References: <20240605105416.278748-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL -0.123 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment POISEN_SPAM_PILL 0.1 Meta: its spam POISEN_SPAM_PILL_2 0.1 random spam to be learned in bayes POISEN_SPAM_PILL_4 0.1 random spam to be learned in bayes SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH v9 proxmox-backup 44/58] datastore: chunker: add Chunker trait X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Add the Chunker trait and move the current Chunker to ChunkerImpl to implement the trait instead. This allows to use different chunker implementations by dynamic dispatch and is in preparation for implementing a dedicated payload chunker. Signed-off-by: Christian Ebner --- changes since version 8: - no changes examples/test_chunk_size.rs | 9 +-- examples/test_chunk_speed.rs | 7 ++- pbs-client/src/chunk_stream.rs | 37 ++++++------ pbs-datastore/src/chunker.rs | 95 ++++++++++++++++++------------ pbs-datastore/src/dynamic_index.rs | 9 +-- pbs-datastore/src/lib.rs | 2 +- 6 files changed, 91 insertions(+), 68 deletions(-) diff --git a/examples/test_chunk_size.rs b/examples/test_chunk_size.rs index a01a5e640..2ebc22f64 100644 --- a/examples/test_chunk_size.rs +++ b/examples/test_chunk_size.rs @@ -5,10 +5,10 @@ extern crate proxmox_backup; use anyhow::Error; use std::io::{Read, Write}; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; struct ChunkWriter { - chunker: Chunker, + chunker: ChunkerImpl, last_chunk: usize, chunk_offset: usize, @@ -23,7 +23,7 @@ struct ChunkWriter { impl ChunkWriter { fn new(chunk_size: usize) -> Self { ChunkWriter { - chunker: Chunker::new(chunk_size), + chunker: ChunkerImpl::new(chunk_size), last_chunk: 0, chunk_offset: 0, chunk_count: 0, @@ -69,7 +69,8 @@ impl Write for ChunkWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { let chunker = &mut self.chunker; - let pos = chunker.scan(data); + let ctx = pbs_datastore::chunker::Context::default(); + let pos = chunker.scan(data, &ctx); if pos > 0 { self.chunk_offset += pos; diff --git a/examples/test_chunk_speed.rs b/examples/test_chunk_speed.rs index 37e13e0de..2d79604ab 100644 --- a/examples/test_chunk_speed.rs +++ b/examples/test_chunk_speed.rs @@ -1,6 +1,6 @@ extern crate proxmox_backup; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; fn main() { let mut buffer = Vec::new(); @@ -12,7 +12,7 @@ fn main() { buffer.push(byte); } } - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let count = 5; @@ -23,8 +23,9 @@ fn main() { for _i in 0..count { let mut pos = 0; let mut _last = 0; + let ctx = pbs_datastore::chunker::Context::default(); while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..]); + let k = chunker.scan(&buffer[pos..], &ctx); if k == 0 { //println!("LAST {}", pos); break; diff --git a/pbs-client/src/chunk_stream.rs b/pbs-client/src/chunk_stream.rs index 87a018d50..84158a2c9 100644 --- a/pbs-client/src/chunk_stream.rs +++ b/pbs-client/src/chunk_stream.rs @@ -7,7 +7,7 @@ use bytes::BytesMut; use futures::ready; use futures::stream::{Stream, TryStream}; -use pbs_datastore::Chunker; +use pbs_datastore::{Chunker, ChunkerImpl}; use crate::inject_reused_chunks::InjectChunks; @@ -16,7 +16,6 @@ pub struct InjectionData { boundaries: mpsc::Receiver, next_boundary: Option, injections: mpsc::Sender, - consumed: u64, } impl InjectionData { @@ -28,7 +27,6 @@ impl InjectionData { boundaries, next_boundary: None, injections, - consumed: 0, } } } @@ -36,19 +34,22 @@ impl InjectionData { /// Split input stream into dynamic sized chunks pub struct ChunkStream { input: S, - chunker: Chunker, + chunker: Box, buffer: BytesMut, scan_pos: usize, + consumed: u64, injection_data: Option, } impl ChunkStream { pub fn new(input: S, chunk_size: Option, injection_data: Option) -> Self { + let chunk_size = chunk_size.unwrap_or(4 * 1024 * 1024); Self { input, - chunker: Chunker::new(chunk_size.unwrap_or(4 * 1024 * 1024)), + chunker: Box::new(ChunkerImpl::new(chunk_size)), buffer: BytesMut::new(), scan_pos: 0, + consumed: 0, injection_data, } } @@ -68,11 +69,15 @@ where let this = self.get_mut(); loop { + let ctx = pbs_datastore::chunker::Context { + base: this.consumed, + total: this.buffer.len() as u64, + }; + if let Some(InjectionData { boundaries, next_boundary, injections, - consumed, }) = this.injection_data.as_mut() { if next_boundary.is_none() { @@ -84,29 +89,29 @@ where if let Some(inject) = next_boundary.take() { // require forced boundary, lookup next regular boundary let pos = if this.scan_pos < this.buffer.len() { - this.chunker.scan(&this.buffer[this.scan_pos..]) + this.chunker.scan(&this.buffer[this.scan_pos..], &ctx) } else { 0 }; let chunk_boundary = if pos == 0 { - *consumed + this.buffer.len() as u64 + this.consumed + this.buffer.len() as u64 } else { - *consumed + (this.scan_pos + pos) as u64 + this.consumed + (this.scan_pos + pos) as u64 }; if inject.boundary <= chunk_boundary { // forced boundary is before next boundary, force within current buffer - let chunk_size = (inject.boundary - *consumed) as usize; + let chunk_size = (inject.boundary - this.consumed) as usize; let raw_chunk = this.buffer.split_to(chunk_size); this.chunker.reset(); this.scan_pos = 0; - *consumed += chunk_size as u64; + this.consumed += chunk_size as u64; // add the size of the injected chunks to consumed, so chunk stream offsets // are in sync with the rest of the archive. - *consumed += inject.size as u64; + this.consumed += inject.size as u64; injections.send(inject).unwrap(); @@ -118,7 +123,7 @@ where // forced boundary is after next boundary, split off chunk from buffer let chunk_size = this.scan_pos + pos; let raw_chunk = this.buffer.split_to(chunk_size); - *consumed += chunk_size as u64; + this.consumed += chunk_size as u64; this.scan_pos = 0; return Poll::Ready(Some(Ok(raw_chunk))); @@ -131,7 +136,7 @@ where } if this.scan_pos < this.buffer.len() { - let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); + let boundary = this.chunker.scan(&this.buffer[this.scan_pos..], &ctx); let chunk_size = this.scan_pos + boundary; @@ -140,9 +145,7 @@ where } else if chunk_size <= this.buffer.len() { // found new chunk boundary inside buffer, split off chunk from buffer let raw_chunk = this.buffer.split_to(chunk_size); - if let Some(InjectionData { consumed, .. }) = this.injection_data.as_mut() { - *consumed += chunk_size as u64; - } + this.consumed += chunk_size as u64; this.scan_pos = 0; return Poll::Ready(Some(Ok(raw_chunk))); } else { diff --git a/pbs-datastore/src/chunker.rs b/pbs-datastore/src/chunker.rs index 253d2cf4c..d75e63fa8 100644 --- a/pbs-datastore/src/chunker.rs +++ b/pbs-datastore/src/chunker.rs @@ -5,6 +5,20 @@ /// use hash value 0 to detect a boundary. const CA_CHUNKER_WINDOW_SIZE: usize = 64; +/// Additional context for chunker to find possible boundaries in payload streams +#[derive(Default)] +pub struct Context { + /// Already consumed bytes of the chunk stream consumer + pub base: u64, + /// Total size currently buffered + pub total: u64, +} + +pub trait Chunker { + fn scan(&mut self, data: &[u8], ctx: &Context) -> usize; + fn reset(&mut self); +} + /// Sliding window chunker (Buzhash) /// /// This is a rewrite of *casync* chunker (cachunker.h) in rust. @@ -15,7 +29,7 @@ const CA_CHUNKER_WINDOW_SIZE: usize = 64; /// Hash](https://en.wikipedia.org/wiki/Rolling_hash) article from /// Wikipedia. -pub struct Chunker { +pub struct ChunkerImpl { h: u32, window_size: usize, chunk_size: usize, @@ -67,7 +81,7 @@ const BUZHASH_TABLE: [u32; 256] = [ 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, 0xf9c18d66, 0x593ade65, 0xd95ddf11, ]; -impl Chunker { +impl ChunkerImpl { /// Create a new Chunker instance, which produces and average /// chunk size of `chunk_size_avg` (need to be a power of two). We /// allow variation from `chunk_size_avg/4` up to a maximum of @@ -105,11 +119,44 @@ impl Chunker { } } + // fast implementation avoiding modulo + // #[inline(always)] + fn shall_break(&self) -> bool { + if self.chunk_size >= self.chunk_size_max { + return true; + } + + if self.chunk_size < self.chunk_size_min { + return false; + } + + //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!! + + //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams + + (self.h & self.break_test_mask) >= self.break_test_minimum + } + + // This is the original implementation from casync + /* + #[inline(always)] + fn shall_break_orig(&self) -> bool { + + if self.chunk_size >= self.chunk_size_max { return true; } + + if self.chunk_size < self.chunk_size_min { return false; } + + (self.h % self.discriminator) == (self.discriminator - 1) + } + */ +} + +impl Chunker for ChunkerImpl { /// Scans the specified data for a chunk border. Returns 0 if none /// was found (and the function should be called with more data /// later on), or another value indicating the position of a /// border. - pub fn scan(&mut self, data: &[u8]) -> usize { + fn scan(&mut self, data: &[u8], _ctx: &Context) -> usize { let window_len = self.window.len(); let data_len = data.len(); @@ -167,42 +214,11 @@ impl Chunker { 0 } - pub fn reset(&mut self) { + fn reset(&mut self) { self.h = 0; self.chunk_size = 0; self.window_size = 0; } - - // fast implementation avoiding modulo - // #[inline(always)] - fn shall_break(&self) -> bool { - if self.chunk_size >= self.chunk_size_max { - return true; - } - - if self.chunk_size < self.chunk_size_min { - return false; - } - - //(self.h & 0x1ffff) <= 2 //THIS IS SLOW!!! - - //(self.h & self.break_test_mask) <= 2 // Bad on 0 streams - - (self.h & self.break_test_mask) >= self.break_test_minimum - } - - // This is the original implementation from casync - /* - #[inline(always)] - fn shall_break_orig(&self) -> bool { - - if self.chunk_size >= self.chunk_size_max { return true; } - - if self.chunk_size < self.chunk_size_min { return false; } - - (self.h % self.discriminator) == (self.discriminator - 1) - } - */ } #[test] @@ -215,17 +231,18 @@ fn test_chunker1() { buffer.push(byte); } } - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let mut pos = 0; let mut last = 0; let mut chunks1: Vec<(usize, usize)> = vec![]; let mut chunks2: Vec<(usize, usize)> = vec![]; + let ctx = Context::default(); // test1: feed single bytes while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..pos + 1]); + let k = chunker.scan(&buffer[pos..pos + 1], &ctx); pos += 1; if k != 0 { let prev = last; @@ -235,13 +252,13 @@ fn test_chunker1() { } chunks1.push((last, buffer.len() - last)); - let mut chunker = Chunker::new(64 * 1024); + let mut chunker = ChunkerImpl::new(64 * 1024); let mut pos = 0; // test2: feed with whole buffer while pos < buffer.len() { - let k = chunker.scan(&buffer[pos..]); + let k = chunker.scan(&buffer[pos..], &ctx); if k != 0 { chunks2.push((pos, k)); pos += k; diff --git a/pbs-datastore/src/dynamic_index.rs b/pbs-datastore/src/dynamic_index.rs index b8047b5b1..dc9eee050 100644 --- a/pbs-datastore/src/dynamic_index.rs +++ b/pbs-datastore/src/dynamic_index.rs @@ -23,7 +23,7 @@ use crate::data_blob::{DataBlob, DataChunkBuilder}; use crate::file_formats; use crate::index::{ChunkReadInfo, IndexFile}; use crate::read_chunk::ReadChunk; -use crate::Chunker; +use crate::{Chunker, ChunkerImpl}; /// Header format definition for dynamic index files (`.dixd`) #[repr(C)] @@ -397,7 +397,7 @@ impl DynamicIndexWriter { pub struct DynamicChunkWriter { index: DynamicIndexWriter, closed: bool, - chunker: Chunker, + chunker: ChunkerImpl, stat: ChunkStat, chunk_offset: usize, last_chunk: usize, @@ -409,7 +409,7 @@ impl DynamicChunkWriter { Self { index, closed: false, - chunker: Chunker::new(chunk_size), + chunker: ChunkerImpl::new(chunk_size), stat: ChunkStat::new(0), chunk_offset: 0, last_chunk: 0, @@ -494,7 +494,8 @@ impl Write for DynamicChunkWriter { fn write(&mut self, data: &[u8]) -> std::result::Result { let chunker = &mut self.chunker; - let pos = chunker.scan(data); + let ctx = crate::chunker::Context::default(); + let pos = chunker.scan(data, &ctx); if pos > 0 { self.chunk_buffer.extend_from_slice(&data[0..pos]); diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs index 43050162f..24429626c 100644 --- a/pbs-datastore/src/lib.rs +++ b/pbs-datastore/src/lib.rs @@ -196,7 +196,7 @@ pub use backup_info::{BackupDir, BackupGroup, BackupInfo}; pub use checksum_reader::ChecksumReader; pub use checksum_writer::ChecksumWriter; pub use chunk_store::ChunkStore; -pub use chunker::Chunker; +pub use chunker::{Chunker, ChunkerImpl}; pub use crypt_reader::CryptReader; pub use crypt_writer::CryptWriter; pub use data_blob::DataBlob; -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel