From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers
Date: Tue, 6 Apr 2021 11:03:42 +0200 [thread overview]
Message-ID: <20210406090347.27579-3-d.csapak@proxmox.com> (raw)
In-Reply-To: <20210406090347.27579-1-d.csapak@proxmox.com>
implements a deflate encoder that can compress anything that implements
AsyncRead + Unpin into a file with the helper 'compress'
if the inner type is a Stream, it implements Stream itself, this way
some streaming data can be streamed compressed
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Cargo.toml | 1 +
src/tools/compression.rs | 193 +++++++++++++++++++++++++++++++++++++++
2 files changed, 194 insertions(+)
diff --git a/Cargo.toml b/Cargo.toml
index 97aa79f2..bb5f8e5e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,7 @@ bitflags = "1.2.1"
bytes = "1.0"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
+flate2 = "1.0"
anyhow = "1.0"
futures = "0.3"
h2 = { version = "0.3", features = [ "stream" ] }
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
index 19626efc..b27d7e70 100644
--- a/src/tools/compression.rs
+++ b/src/tools/compression.rs
@@ -1,5 +1,19 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
use anyhow::{bail, Error};
+use bytes::Bytes;
+use flate2::{Compress, Compression, FlushCompress};
+use futures::ready;
+use futures::stream::Stream;
use hyper::header;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+use proxmox::io_format_err;
+use proxmox::tools::byte_buffer::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
/// Possible Compression Methods, order determines preference (later is preferred)
#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
@@ -37,3 +51,182 @@ impl std::str::FromStr for CompressionMethod {
}
}
}
+
+pub enum Level {
+ Fastest,
+ Best,
+ Default,
+ Precise(u32),
+}
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateEncoder<T> {
+ inner: T,
+ compressor: Compress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: EncoderState,
+}
+
+impl<T> DeflateEncoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::with_quality(inner, Level::Default)
+ }
+
+ pub fn with_quality(inner: T, level: Level) -> Self {
+ let level = match level {
+ Level::Fastest => Compression::fast(),
+ Level::Best => Compression::best(),
+ Level::Default => Compression::new(3),
+ Level::Precise(val) => Compression::new(val),
+ };
+
+ Self {
+ inner,
+ compressor: Compress::new(level, false),
+ buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
+ input_buffer: Bytes::new(),
+ state: EncoderState::Reading,
+ }
+ }
+
+ pub fn total_in(&self) -> u64 {
+ self.compressor.total_in()
+ }
+
+ pub fn total_out(&self) -> u64 {
+ self.compressor.total_out()
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ fn encode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushCompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.compressor.total_in();
+ let old_out = self.compressor.total_out();
+ let res = self
+ .compressor
+ .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.compressor.total_in() - old_in) as usize;
+ let new_out = (self.compressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl DeflateEncoder<Vec<u8>> {
+ // assume small files
+ pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin,
+ {
+ let mut buffer = Vec::with_capacity(size_hint);
+ reader.read_to_end(&mut buffer).await?;
+ self.inner.reserve(size_hint); // should be enough since we want smalller files
+ self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
+ Ok(())
+ }
+}
+
+impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
+ pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin,
+ {
+ let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
+ let mut eof = false;
+ loop {
+ if !eof && !buffer.is_full() {
+ let read = buffer.read_from_async(reader).await?;
+ if read == 0 {
+ eof = true;
+ }
+ }
+ let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
+ buffer.consume(read);
+
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+
+ if buffer.is_empty() && eof {
+ break;
+ }
+ }
+
+ loop {
+ let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+ if res == flate2::Status::StreamEnd {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl<T, O> Stream for DeflateEncoder<T>
+where
+ T: Stream<Item = Result<O, io::Error>> + Unpin,
+ O: Into<Bytes>
+{
+ type Item = Result<Bytes, io::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ EncoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res?;
+ this.input_buffer = buf.into();
+ this.state = EncoderState::Writing;
+ } else {
+ this.state = EncoderState::Flushing;
+ }
+ }
+ EncoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = EncoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ EncoderState::Flushing => {
+ let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = EncoderState::Finished;
+ }
+ }
+ EncoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
--
2.20.1
next prev parent reply other threads:[~2021-04-06 9:04 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-04-06 9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
2021-04-06 9:03 ` Dominik Csapak [this message]
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers Dominik Csapak
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls Dominik Csapak
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files Dominik Csapak
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt Dominik Csapak
2021-04-06 9:03 ` [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate Dominik Csapak
2021-04-07 16:00 ` [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Thomas Lamprecht
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210406090347.27579-3-d.csapak@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox