From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 14E966CD57 for ; Wed, 31 Mar 2021 11:44:52 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CCE8AD235 for ; Wed, 31 Mar 2021 11:44:21 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 99933D1F1 for ; Wed, 31 Mar 2021 11:44:19 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 6293445882 for ; Wed, 31 Mar 2021 11:44:19 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Wed, 31 Mar 2021 11:44:14 +0200 Message-Id: <20210331094418.16609-3-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210331094418.16609-1-d.csapak@proxmox.com> References: <20210331094418.16609-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.175 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [compression.rs] Subject: [pbs-devel] [PATCH proxmox-backup 2/6] tools/compression: add DeflateEncoder and helpers X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 31 Mar 2021 09:44:52 -0000 implements a deflate encoder that can compress anything that implements AsyncRead + Unpin into a file with the helper 'compress' if the inner type is a Stream, it implements Stream itself, this way some streaming data can be streamed compressed Signed-off-by: Dominik Csapak --- Cargo.toml | 1 + src/tools/compression.rs | 214 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index b0ef56bd..0f3005e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ bitflags = "1.2.1" bytes = "1.0" crc32fast = "1" endian_trait = { version = "0.6", features = ["arrays"] } +flate2 = "1.0" anyhow = "1.0" futures = "0.3" h2 = { version = "0.3", features = [ "stream" ] } diff --git a/src/tools/compression.rs b/src/tools/compression.rs index fe15e8fc..a3fb913d 100644 --- a/src/tools/compression.rs +++ b/src/tools/compression.rs @@ -1,5 +1,21 @@ +use std::io; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + use anyhow::{bail, Error}; +use bytes::Bytes; +use flate2::{Compress, Compression, FlushCompress}; +use futures::ready; +use futures::stream::Stream; use hyper::header; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite, AsyncWriteExt}; + +use proxmox::http_err; +use proxmox::io_format_err; +use proxmox::tools::byte_buffer::ByteBuffer; /// Possible Compression Methods, order determines preference (later is preferred) #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)] @@ -35,3 +51,201 @@ impl std::str::FromStr for CompressionMethod { } } } + +/// compresses input to output via the given compression method and returns +/// the compressed File ready to read +pub async fn compress_file( + source_path: &PathBuf, + target_path: &PathBuf, + compression: &CompressionMethod, +) -> Result { + let mut source = File::open(source_path) + .await + .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; + + let mut target = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .mode(0o644) + .open(target_path) + .await + .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?; + + let _lock = proxmox::tools::fs::lock_file(&mut target, true, Some(Duration::new(1, 0)))?; + + let mut target = match compression { + CompressionMethod::Deflate => { + let mut enc = DeflateEncoder::with_quality(target, Level::Fastest); + enc.compress(&mut source).await?; + enc.into_inner() + } + _ => bail!("compression: {:?} not implemented", compression), + }; + + target.sync_all().await?; + target.seek(std::io::SeekFrom::Start(0)).await?; + + Ok(target) +} + +pub enum Level { + Fastest, + Best, + Default, + Precise(u32), +} + +#[derive(Eq, PartialEq)] +enum EncoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct DeflateEncoder { + inner: T, + compressor: Compress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: EncoderState, +} + +impl DeflateEncoder { + pub fn new(inner: T) -> Self { + Self::with_quality(inner, Level::Default) + } + + pub fn with_quality(inner: T, level: Level) -> Self { + let level = match level { + Level::Fastest => Compression::fast(), + Level::Best => Compression::best(), + Level::Default => Compression::new(3), + Level::Precise(val) => Compression::new(val), + }; + + Self { + inner, + compressor: Compress::new(level, false), + buffer: ByteBuffer::with_capacity(8192), + input_buffer: Bytes::new(), + state: EncoderState::Reading, + } + } + + pub fn total_in(&self) -> u64 { + self.compressor.total_in() + } + + pub fn total_out(&self) -> u64 { + self.compressor.total_out() + } + + pub fn into_inner(self) -> T { + self.inner + } + + fn encode( + &mut self, + inbuf: &[u8], + flush: FlushCompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.compressor.total_in(); + let old_out = self.compressor.total_out(); + let res = self + .compressor + .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.compressor.total_in() - old_in) as usize; + let new_out = (self.compressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl DeflateEncoder { + pub async fn compress(&mut self, reader: &mut R) -> Result<(), Error> + where + R: AsyncRead + Unpin, + { + let mut buffer = ByteBuffer::with_capacity(8192); + let mut eof = false; + loop { + if !eof && !buffer.is_full() { + let read = buffer.read_from_async(reader).await?; + if read == 0 { + eof = true; + } + } + let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?; + buffer.consume(read); + + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + + if buffer.is_empty() && eof { + break; + } + } + + loop { + let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?; + self.inner.write_all(&self.buffer[..]).await?; + self.buffer.clear(); + if res == flate2::Status::StreamEnd { + break; + } + } + + Ok(()) + } +} + +impl> + Unpin> Stream for DeflateEncoder { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + EncoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res?; + this.input_buffer = buf; + this.state = EncoderState::Writing; + } else { + this.state = EncoderState::Flushing; + } + } + EncoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(io_format_err!("empty input during write")))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.encode(&buf[..], FlushCompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = EncoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + EncoderState::Flushing => { + let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = EncoderState::Finished; + } + } + EncoderState::Finished => return Poll::Ready(None), + } + } + } +} -- 2.20.1