From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 44EC194785 for ; Wed, 10 Apr 2024 16:31:09 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 2670A1232A for ; Wed, 10 Apr 2024 16:30:39 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 10 Apr 2024 16:30:38 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D478643C28 for ; Wed, 10 Apr 2024 16:30:37 +0200 (CEST) From: Maximiliano Sandoval To: pbs-devel@lists.proxmox.com Date: Wed, 10 Apr 2024 16:30:34 +0200 Message-Id: <20240410143036.81807-4-m.sandoval@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240410143036.81807-1-m.sandoval@proxmox.com> References: <20240410143036.81807-1-m.sandoval@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.013 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox v3 3/5] compression: deflate: add a decoder X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 10 Apr 2024 14:31:09 -0000 Signed-off-by: Maximiliano Sandoval --- .../src/deflate/decompression.rs | 141 ++++++++++++++++++ proxmox-compression/src/deflate/mod.rs | 2 + proxmox-compression/src/lib.rs | 2 +- 3 files changed, 144 insertions(+), 1 deletion(-) create mode 100644 proxmox-compression/src/deflate/decompression.rs diff --git a/proxmox-compression/src/deflate/decompression.rs b/proxmox-compression/src/deflate/decompression.rs new file mode 100644 index 00000000..45ed8579 --- /dev/null +++ b/proxmox-compression/src/deflate/decompression.rs @@ -0,0 +1,141 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Error; +use bytes::Bytes; +use flate2::{Decompress, FlushDecompress}; +use futures::ready; +use futures::stream::Stream; + +use proxmox_io::ByteBuffer; + +#[derive(Eq, PartialEq)] +enum DecoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct DeflateDecoder { + inner: T, + decompressor: Decompress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: DecoderState, +} + +pub struct DeflateDecoderBuilder { + inner: T, + is_zlib: bool, + buffer_size: usize, +} + +impl DeflateDecoderBuilder { + pub fn zlib(mut self, is_zlib: bool) -> Self { + self.is_zlib = is_zlib; + self + } + + pub fn buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = buffer_size; + self + } + + pub fn build(self) -> DeflateDecoder { + DeflateDecoder { + inner: self.inner, + decompressor: Decompress::new(self.is_zlib), + buffer: ByteBuffer::with_capacity(self.buffer_size), + input_buffer: Bytes::new(), + state: DecoderState::Reading, + } + } +} + +impl DeflateDecoder { + pub fn new(inner: T) -> Self { + Self::builder(inner).build() + } + + pub fn builder(inner: T) -> DeflateDecoderBuilder { + DeflateDecoderBuilder { + inner, + is_zlib: false, + buffer_size: super::BUFFER_SIZE, + } + } + + fn decode( + &mut self, + inbuf: &[u8], + flush: FlushDecompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.decompressor.total_in(); + let old_out = self.decompressor.total_out(); + let res = self + .decompressor + .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.decompressor.total_in() - old_in) as usize; + let new_out = (self.decompressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl Stream for DeflateDecoder +where + T: Stream> + Unpin, + O: Into, + E: Into, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + DecoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res.map_err(Into::into)?; + this.input_buffer = buf.into(); + this.state = DecoderState::Writing; + } else { + this.state = DecoderState::Flushing; + } + } + DecoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(anyhow::format_err!( + "empty input during write" + )))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.decode(&buf[..], FlushDecompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = DecoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + DecoderState::Flushing => { + let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = DecoderState::Finished; + } + } + DecoderState::Finished => return Poll::Ready(None), + } + } + } +} diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs index 514ccbdc..6867176c 100644 --- a/proxmox-compression/src/deflate/mod.rs +++ b/proxmox-compression/src/deflate/mod.rs @@ -1,5 +1,7 @@ mod compression; +mod decompression; pub use compression::{DeflateEncoder, Level}; +pub use decompression::DeflateDecoder; const BUFFER_SIZE: usize = 8192; diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs index 239de623..f33212e5 100644 --- a/proxmox-compression/src/lib.rs +++ b/proxmox-compression/src/lib.rs @@ -1,4 +1,4 @@ -pub use deflate::{DeflateEncoder, Level}; +pub use deflate::{DeflateDecoder, DeflateEncoder, Level}; pub(crate) mod deflate; pub mod tar; -- 2.39.2