From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 13BC270E70 for ; Tue, 6 Apr 2021 11:04:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0AC782B6A3 for ; Tue, 6 Apr 2021 11:03:55 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 164EB2B621 for ; Tue, 6 Apr 2021 11:03:49 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D5998459C3 for ; Tue, 6 Apr 2021 11:03:48 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 6 Apr 2021 11:03:42 +0200 Message-Id: <20210406090347.27579-3-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210406090347.27579-1-d.csapak@proxmox.com> References: <20210406090347.27579-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.169 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 06 Apr 2021 09:04:25 -0000 implements a deflate encoder that can compress anything that implements AsyncRead + Unpin into a file with the helper 'compress' if the inner type is a Stream, it implements Stream itself, this way some streaming data can be streamed compressed Signed-off-by: Dominik Csapak --- Cargo.toml | 1 + src/tools/compression.rs | 193 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 97aa79f2..bb5f8e5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ bitflags = "1.2.1" bytes = "1.0" crc32fast = "1" endian_trait = { version = "0.6", features = ["arrays"] } +flate2 = "1.0" anyhow = "1.0" futures = "0.3" h2 = { version = "0.3", features = [ "stream" ] } diff --git a/src/tools/compression.rs b/src/tools/compression.rs index 19626efc..b27d7e70 100644 --- a/src/tools/compression.rs +++ b/src/tools/compression.rs @@ -1,5 +1,19 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + use anyhow::{bail, Error}; +use bytes::Bytes; +use flate2::{Compress, Compression, FlushCompress}; +use futures::ready; +use futures::stream::Stream; use hyper::header; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use proxmox::io_format_err; +use proxmox::tools::byte_buffer::ByteBuffer; + +const BUFFER_SIZE: usize = 8192; /// Possible Compression Methods, order determines preference (later is preferred) #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] @@ -37,3 +51,182 @@ impl std::str::FromStr for CompressionMethod { } } } + +pub enum Level { + Fastest, + Best, + Default, + Precise(u32), +} + +#[derive(Eq, PartialEq)] +enum EncoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct DeflateEncoder { + inner: T, + compressor: Compress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: EncoderState, +} + +impl DeflateEncoder { + pub fn new(inner: T) -> Self { + Self::with_quality(inner, Level::Default) + } + + pub fn with_quality(inner: T, level: Level) -> Self { + let level = match level { + Level::Fastest => Compression::fast(), + Level::Best => Compression::best(), + Level::Default => Compression::new(3), + Level::Precise(val) => Compression::new(val), + }; + + Self { + inner, + compressor: Compress::new(level, false), + buffer: ByteBuffer::with_capacity(BUFFER_SIZE), + input_buffer: Bytes::new(), + state: EncoderState::Reading, + } + } + + pub fn total_in(&self) -> u64 { + self.compressor.total_in() + } + + pub fn total_out(&self) -> u64 { + self.compressor.total_out() + } + + pub fn into_inner(self) -> T { + self.inner + } + + fn encode( + &mut self, + inbuf: &[u8], + flush: FlushCompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.compressor.total_in(); + let old_out = self.compressor.total_out(); + let res = self + .compressor + .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.compressor.total_in() - old_in) as usize; + let new_out = (self.compressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl DeflateEncoder> { + // assume small files + pub async fn compress_vec(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error> + where + R: AsyncRead + Unpin, + { + let mut buffer = Vec::with_capacity(size_hint); + reader.read_to_end(&mut buffer).await?; + self.inner.reserve(size_hint); // should be enough since we want smalller files + self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; + Ok(()) + } +} + +impl DeflateEncoder { + pub async fn compress(&mut self, reader: &mut R) -> Result<(), Error> + where + R: AsyncRead + Unpin, + { + let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE); + let mut eof = false; + loop { + if !eof && !buffer.is_full() { + let read = buffer.read_from_async(reader).await?; + if read == 0 { + eof = true; + } + } + let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?; + buffer.consume(read); + + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + + if buffer.is_empty() && eof { + break; + } + } + + loop { + let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?; + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + if res == flate2::Status::StreamEnd { + break; + } + } + + Ok(()) + } +} + +impl Stream for DeflateEncoder +where + T: Stream> + Unpin, + O: Into +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + EncoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res?; + this.input_buffer = buf.into(); + this.state = EncoderState::Writing; + } else { + this.state = EncoderState::Flushing; + } + } + EncoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.encode(&buf[..], FlushCompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = EncoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + EncoderState::Flushing => { + let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = EncoderState::Finished; + } + } + EncoderState::Finished => return Poll::Ready(None), + } + } + } +} -- 2.20.1