From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 556D27046B for ; Fri, 2 Apr 2021 14:14:16 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4851BE978 for ; Fri, 2 Apr 2021 14:14:16 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 739B0E96B for ; Fri, 2 Apr 2021 14:14:15 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 373CF43624 for ; Fri, 2 Apr 2021 14:14:15 +0200 (CEST) Message-ID: Date: Fri, 2 Apr 2021 14:14:13 +0200 MIME-Version: 1.0 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:88.0) Gecko/20100101 Thunderbird/88.0 Content-Language: en-US To: Proxmox Backup Server development discussion , Dominik Csapak References: <20210401141123.12964-1-d.csapak@proxmox.com> <20210401141123.12964-3-d.csapak@proxmox.com> From: Thomas Lamprecht In-Reply-To: <20210401141123.12964-3-d.csapak@proxmox.com> Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.043 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment NICE_REPLY_A -0.001 Looks like a legit reply (A) RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [compression.rs] Subject: Re: [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 02 Apr 2021 12:14:16 -0000 On 01.04.21 16:11, Dominik Csapak wrote: > implements a deflate encoder that can compress anything that implements > AsyncRead + Unpin into a file with the helper 'compress' > > if the inner type is a Stream, it implements Stream itself, this way > some streaming data can be streamed compressed > > Signed-off-by: Dominik Csapak > --- > Cargo.toml | 1 + > src/tools/compression.rs | 191 +++++++++++++++++++++++++++++++++++++++ > 2 files changed, 192 insertions(+) > > diff --git a/Cargo.toml b/Cargo.toml > index 69b07d41..2f59b310 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -29,6 +29,7 @@ bitflags = "1.2.1" > bytes = "1.0" > crc32fast = "1" > endian_trait = { version = "0.6", features = ["arrays"] } > +flate2 = "1.0" > anyhow = "1.0" > futures = "0.3" > h2 = { version = "0.3", features = [ "stream" ] } > diff --git a/src/tools/compression.rs b/src/tools/compression.rs > index fe15e8fc..cc6ea732 100644 > --- a/src/tools/compression.rs > +++ b/src/tools/compression.rs > @@ -1,5 +1,17 @@ > +use std::io; > +use std::pin::Pin; > +use std::task::{Context, Poll}; > + > use anyhow::{bail, Error}; > +use bytes::Bytes; > +use flate2::{Compress, Compression, FlushCompress}; > +use futures::ready; > +use futures::stream::Stream; > use hyper::header; > +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; > + > +use proxmox::io_format_err; > +use proxmox::tools::byte_buffer::ByteBuffer; > > /// Possible Compression Methods, order determines preference (later is preferred) > #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] > @@ -35,3 +47,182 @@ impl std::str::FromStr for CompressionMethod { > } > } > } > + > +pub enum Level { > + Fastest, > + Best, > + Default, > + Precise(u32), > +} > + > +#[derive(Eq, PartialEq)] > +enum EncoderState { > + Reading, > + Writing, > + Flushing, > + Finished, > +} > + > +pub struct DeflateEncoder { > + inner: T, > + compressor: Compress, > + buffer: ByteBuffer, > + input_buffer: Bytes, > + state: EncoderState, > +} > + > +impl DeflateEncoder { > + pub fn new(inner: T) -> Self { > + Self::with_quality(inner, Level::Default) > + } > + > + pub fn with_quality(inner: T, level: Level) -> Self { > + let level = match level { > + Level::Fastest => Compression::fast(), > + Level::Best => Compression::best(), > + Level::Default => Compression::new(3), > + Level::Precise(val) => Compression::new(val), > + }; > + > + Self { > + inner, > + compressor: Compress::new(level, false), > + buffer: ByteBuffer::with_capacity(8192), maybe we could use a const for this buffer size > + input_buffer: Bytes::new(), > + state: EncoderState::Reading, > + } > + } > + > + pub fn total_in(&self) -> u64 { does this need to be publicly visible? At least now its only used in the private encode function below. Really no hard feelings about this, just noticed. > + self.compressor.total_in() > + } > + > + pub fn total_out(&self) -> u64 { same here > + self.compressor.total_out() > + } > + > + pub fn into_inner(self) -> T { > + self.inner > + } > + > + fn encode( > + &mut self, > + inbuf: &[u8], > + flush: FlushCompress, > + ) -> Result<(usize, flate2::Status), io::Error> { > + let old_in = self.compressor.total_in(); > + let old_out = self.compressor.total_out(); > + let res = self > + .compressor > + .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?; > + let new_in = (self.compressor.total_in() - old_in) as usize; > + let new_out = (self.compressor.total_out() - old_out) as usize; > + self.buffer.add_size(new_out); > + > + Ok((new_in, res)) > + } > +} > + > +impl DeflateEncoder> { > + // assume small files > + pub async fn compress_vec(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error> > + where > + R: AsyncRead + Unpin, > + { > + let mut buffer = Vec::with_capacity(size_hint); > + reader.read_to_end(&mut buffer).await?; > + self.inner.reserve(size_hint); // should be enough since we want smalller files > + self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?; > + Ok(()) > + } > +} > + > +impl DeflateEncoder { > + pub async fn compress(&mut self, reader: &mut R) -> Result<(), Error> > + where > + R: AsyncRead + Unpin, > + { > + let mut buffer = ByteBuffer::with_capacity(8192); and reuse the const buffer size here.. > + let mut eof = false; > + loop { > + if !eof && !buffer.is_full() { > + let read = buffer.read_from_async(reader).await?; > + if read == 0 { > + eof = true; > + } > + } > + let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?; > + buffer.consume(read); > + > + self.inner.write_all(&self.buffer[..]).await?; > + self.buffer.clear(); > + > + if buffer.is_empty() && eof { > + break; > + } > + } > + > + loop { > + let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?; > + self.inner.write_all(&self.buffer[..]).await?; > + self.buffer.clear(); > + if res == flate2::Status::StreamEnd { > + break; > + } > + } > + > + Ok(()) > + } > +} > + > +impl Stream for DeflateEncoder > +where > + T: Stream> + Unpin, > + O: Into > +{ > + type Item = Result; > + > + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { > + let this = self.get_mut(); > + > + loop { > + match this.state { > + EncoderState::Reading => { > + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { > + let buf = res?; > + this.input_buffer = buf.into(); > + this.state = EncoderState::Writing; > + } else { > + this.state = EncoderState::Flushing; > + } > + } > + EncoderState::Writing => { > + if this.input_buffer.is_empty() { > + return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); > + } > + let mut buf = this.input_buffer.split_off(0); > + let (read, res) = this.encode(&buf[..], FlushCompress::None)?; > + this.input_buffer = buf.split_off(read); > + if this.input_buffer.is_empty() { > + this.state = EncoderState::Reading; > + } > + if this.buffer.is_full() || res == flate2::Status::BufError { > + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); > + return Poll::Ready(Some(Ok(bytes.into()))); > + } > + } > + EncoderState::Flushing => { > + let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?; > + if !this.buffer.is_empty() { > + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); > + return Poll::Ready(Some(Ok(bytes.into()))); > + } > + if res == flate2::Status::StreamEnd { > + this.state = EncoderState::Finished; > + } > + } > + EncoderState::Finished => return Poll::Ready(None), > + } > + } > + } > +} >