public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Thomas Lamprecht <t.lamprecht@proxmox.com>
To: Proxmox Backup Server development discussion
	<pbs-devel@lists.proxmox.com>,
	Dominik Csapak <d.csapak@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers
Date: Fri, 2 Apr 2021 14:14:13 +0200	[thread overview]
Message-ID: <cfc605b1-e9e9-0065-a5f0-cf2c9dd240c4@proxmox.com> (raw)
In-Reply-To: <20210401141123.12964-3-d.csapak@proxmox.com>

On 01.04.21 16:11, Dominik Csapak wrote:
> implements a deflate encoder that can compress anything that implements
> AsyncRead + Unpin into a file with the helper 'compress'
> 
> if the inner type is a Stream, it implements Stream itself, this way
> some streaming data can be streamed compressed
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  Cargo.toml               |   1 +
>  src/tools/compression.rs | 191 +++++++++++++++++++++++++++++++++++++++
>  2 files changed, 192 insertions(+)
> 
> diff --git a/Cargo.toml b/Cargo.toml
> index 69b07d41..2f59b310 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -29,6 +29,7 @@ bitflags = "1.2.1"
>  bytes = "1.0"
>  crc32fast = "1"
>  endian_trait = { version = "0.6", features = ["arrays"] }
> +flate2 = "1.0"
>  anyhow = "1.0"
>  futures = "0.3"
>  h2 = { version = "0.3", features = [ "stream" ] }
> diff --git a/src/tools/compression.rs b/src/tools/compression.rs
> index fe15e8fc..cc6ea732 100644
> --- a/src/tools/compression.rs
> +++ b/src/tools/compression.rs
> @@ -1,5 +1,17 @@
> +use std::io;
> +use std::pin::Pin;
> +use std::task::{Context, Poll};
> +
>  use anyhow::{bail, Error};
> +use bytes::Bytes;
> +use flate2::{Compress, Compression, FlushCompress};
> +use futures::ready;
> +use futures::stream::Stream;
>  use hyper::header;
> +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::byte_buffer::ByteBuffer;
>  
>  /// Possible Compression Methods, order determines preference (later is preferred)
>  #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
> @@ -35,3 +47,182 @@ impl std::str::FromStr for CompressionMethod {
>          }
>      }
>  }
> +
> +pub enum Level {
> +    Fastest,
> +    Best,
> +    Default,
> +    Precise(u32),
> +}
> +
> +#[derive(Eq, PartialEq)]
> +enum EncoderState {
> +    Reading,
> +    Writing,
> +    Flushing,
> +    Finished,
> +}
> +
> +pub struct DeflateEncoder<T> {
> +    inner: T,
> +    compressor: Compress,
> +    buffer: ByteBuffer,
> +    input_buffer: Bytes,
> +    state: EncoderState,
> +}
> +
> +impl<T> DeflateEncoder<T> {
> +    pub fn new(inner: T) -> Self {
> +        Self::with_quality(inner, Level::Default)
> +    }
> +
> +    pub fn with_quality(inner: T, level: Level) -> Self {
> +        let level = match level {
> +            Level::Fastest => Compression::fast(),
> +            Level::Best => Compression::best(),
> +            Level::Default => Compression::new(3),
> +            Level::Precise(val) => Compression::new(val),
> +        };
> +
> +        Self {
> +            inner,
> +            compressor: Compress::new(level, false),
> +            buffer: ByteBuffer::with_capacity(8192),

maybe we could use a const for this buffer size

> +            input_buffer: Bytes::new(),
> +            state: EncoderState::Reading,
> +        }
> +    }
> +
> +    pub fn total_in(&self) -> u64 {

does this need to be publicly visible? At least now its only used in the
private encode function below. Really no hard feelings about this, just noticed.

> +        self.compressor.total_in()
> +    }
> +
> +    pub fn total_out(&self) -> u64 {

same here

> +        self.compressor.total_out()
> +    }
> +
> +    pub fn into_inner(self) -> T {
> +        self.inner
> +    }
> +
> +    fn encode(
> +        &mut self,
> +        inbuf: &[u8],
> +        flush: FlushCompress,
> +    ) -> Result<(usize, flate2::Status), io::Error> {
> +        let old_in = self.compressor.total_in();
> +        let old_out = self.compressor.total_out();
> +        let res = self
> +            .compressor
> +            .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
> +        let new_in = (self.compressor.total_in() - old_in) as usize;
> +        let new_out = (self.compressor.total_out() - old_out) as usize;
> +        self.buffer.add_size(new_out);
> +
> +        Ok((new_in, res))
> +    }
> +}
> +
> +impl DeflateEncoder<Vec<u8>> {
> +    // assume small files
> +    pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
> +    where
> +        R: AsyncRead + Unpin,
> +    {
> +        let mut buffer = Vec::with_capacity(size_hint);
> +        reader.read_to_end(&mut buffer).await?;
> +        self.inner.reserve(size_hint); // should be enough since we want smalller files
> +        self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
> +        Ok(())
> +    }
> +}
> +
> +impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
> +    pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
> +    where
> +        R: AsyncRead + Unpin,
> +    {
> +        let mut buffer = ByteBuffer::with_capacity(8192);

and reuse the const buffer size here..

> +        let mut eof = false;
> +        loop {
> +            if !eof && !buffer.is_full() {
> +                let read = buffer.read_from_async(reader).await?;
> +                if read == 0 {
> +                    eof = true;
> +                }
> +            }
> +            let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
> +            buffer.consume(read);
> +
> +            self.inner.write_all(&self.buffer[..]).await?;
> +            self.buffer.clear();
> +
> +            if buffer.is_empty() && eof {
> +                break;
> +            }
> +        }
> +
> +        loop {
> +            let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
> +            self.inner.write_all(&self.buffer[..]).await?;
> +            self.buffer.clear();
> +            if res == flate2::Status::StreamEnd {
> +                break;
> +            }
> +        }
> +
> +        Ok(())
> +    }
> +}
> +
> +impl<T, O> Stream for DeflateEncoder<T>
> +where
> +    T: Stream<Item = Result<O, io::Error>> + Unpin,
> +    O: Into<Bytes>
> +{
> +    type Item = Result<Bytes, io::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.state {
> +                EncoderState::Reading => {
> +                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
> +                        let buf = res?;
> +                        this.input_buffer = buf.into();
> +                        this.state = EncoderState::Writing;
> +                    } else {
> +                        this.state = EncoderState::Flushing;
> +                    }
> +                }
> +                EncoderState::Writing => {
> +                    if this.input_buffer.is_empty() {
> +                        return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
> +                    }
> +                    let mut buf = this.input_buffer.split_off(0);
> +                    let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
> +                    this.input_buffer = buf.split_off(read);
> +                    if this.input_buffer.is_empty() {
> +                        this.state = EncoderState::Reading;
> +                    }
> +                    if this.buffer.is_full() || res == flate2::Status::BufError {
> +                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> +                        return Poll::Ready(Some(Ok(bytes.into())));
> +                    }
> +                }
> +                EncoderState::Flushing => {
> +                    let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
> +                    if !this.buffer.is_empty() {
> +                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> +                        return Poll::Ready(Some(Ok(bytes.into())));
> +                    }
> +                    if res == flate2::Status::StreamEnd {
> +                        this.state = EncoderState::Finished;
> +                    }
> +                }
> +                EncoderState::Finished => return Poll::Ready(None),
> +            }
> +        }
> +    }
> +}
> 





  reply	other threads:[~2021-04-02 12:14 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module Dominik Csapak
2021-04-02 12:07   ` Thomas Lamprecht
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers Dominik Csapak
2021-04-02 12:14   ` Thomas Lamprecht [this message]
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers Dominik Csapak
2021-04-02 12:20   ` Thomas Lamprecht
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] server/rest: compress api calls Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files Dominik Csapak
2021-04-02 12:32   ` Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cfc605b1-e9e9-0065-a5f0-cf2c9dd240c4@proxmox.com \
    --to=t.lamprecht@proxmox.com \
    --cc=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal