public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip
@ 2021-04-06  9:03 Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

this series combines my two series about adding compression to
api calls/static as well as the zip deflate series

changes from v2:
compression:
* comment out non 'Deflate' CompressionMethods to have better match arms
* use constants for 'magic' values
* (partially) parse accept-encoding qualifiers
zip:
* use DeflateEncoder instead of flate2 manually
* reorder patches so that the rustfmt patch comes first

Dominik Csapak (7):
  tools: add compression module
  tools/compression: add DeflateEncoder and helpers
  server/rest: add helper to extract compression headers
  server/rest: compress api calls
  server/rest: compress static files
  tools/zip: run rustfmt
  tools/zip: compress zips with deflate

 Cargo.toml               |   1 +
 src/server/rest.rs       | 121 ++++++++++++++++----
 src/tools.rs             |   1 +
 src/tools/compression.rs | 232 +++++++++++++++++++++++++++++++++++++++
 src/tools/zip.rs         | 133 +++++++++++++++-------
 5 files changed, 425 insertions(+), 63 deletions(-)
 create mode 100644 src/tools/compression.rs

-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers Dominik Csapak
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

only contains a basic enum for the different compresssion methods

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools.rs             |  1 +
 src/tools/compression.rs | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 40 insertions(+)
 create mode 100644 src/tools/compression.rs

diff --git a/src/tools.rs b/src/tools.rs
index 7e3bff7b..cd1415ec 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -22,6 +22,7 @@ pub mod apt;
 pub mod async_io;
 pub mod borrow;
 pub mod cert;
+pub mod compression;
 pub mod daemon;
 pub mod disks;
 pub mod format;
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
new file mode 100644
index 00000000..19626efc
--- /dev/null
+++ b/src/tools/compression.rs
@@ -0,0 +1,39 @@
+use anyhow::{bail, Error};
+use hyper::header;
+
+/// Possible Compression Methods, order determines preference (later is preferred)
+#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
+pub enum CompressionMethod {
+    Deflate,
+//    Gzip,
+//    Brotli,
+}
+
+impl CompressionMethod {
+    pub fn content_encoding(&self) -> header::HeaderValue {
+        header::HeaderValue::from_static(self.extension())
+    }
+
+    pub fn extension(&self) -> &'static str {
+        match *self {
+//            CompressionMethod::Brotli => "br",
+//            CompressionMethod::Gzip => "gzip",
+            CompressionMethod::Deflate => "deflate",
+        }
+    }
+}
+
+impl std::str::FromStr for CompressionMethod {
+    type Err = Error;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+//            "br" => Ok(CompressionMethod::Brotli),
+//            "gzip" => Ok(CompressionMethod::Gzip),
+            "deflate" => Ok(CompressionMethod::Deflate),
+            // http accept-encoding allows to give weights with ';q='
+            other if other.starts_with("deflate;q=") => Ok(CompressionMethod::Deflate),
+            _ => bail!("unknown compression format"),
+        }
+    }
+}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers Dominik Csapak
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

implements a deflate encoder that can compress anything that implements
AsyncRead + Unpin into a file with the helper 'compress'

if the inner type is a Stream, it implements Stream itself, this way
some streaming data can be streamed compressed

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 Cargo.toml               |   1 +
 src/tools/compression.rs | 193 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 194 insertions(+)

diff --git a/Cargo.toml b/Cargo.toml
index 97aa79f2..bb5f8e5e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,7 @@ bitflags = "1.2.1"
 bytes = "1.0"
 crc32fast = "1"
 endian_trait = { version = "0.6", features = ["arrays"] }
+flate2 = "1.0"
 anyhow = "1.0"
 futures = "0.3"
 h2 = { version = "0.3", features = [ "stream" ] }
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
index 19626efc..b27d7e70 100644
--- a/src/tools/compression.rs
+++ b/src/tools/compression.rs
@@ -1,5 +1,19 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
 use anyhow::{bail, Error};
+use bytes::Bytes;
+use flate2::{Compress, Compression, FlushCompress};
+use futures::ready;
+use futures::stream::Stream;
 use hyper::header;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+use proxmox::io_format_err;
+use proxmox::tools::byte_buffer::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
 
 /// Possible Compression Methods, order determines preference (later is preferred)
 #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
@@ -37,3 +51,182 @@ impl std::str::FromStr for CompressionMethod {
         }
     }
 }
+
+pub enum Level {
+    Fastest,
+    Best,
+    Default,
+    Precise(u32),
+}
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+    Reading,
+    Writing,
+    Flushing,
+    Finished,
+}
+
+pub struct DeflateEncoder<T> {
+    inner: T,
+    compressor: Compress,
+    buffer: ByteBuffer,
+    input_buffer: Bytes,
+    state: EncoderState,
+}
+
+impl<T> DeflateEncoder<T> {
+    pub fn new(inner: T) -> Self {
+        Self::with_quality(inner, Level::Default)
+    }
+
+    pub fn with_quality(inner: T, level: Level) -> Self {
+        let level = match level {
+            Level::Fastest => Compression::fast(),
+            Level::Best => Compression::best(),
+            Level::Default => Compression::new(3),
+            Level::Precise(val) => Compression::new(val),
+        };
+
+        Self {
+            inner,
+            compressor: Compress::new(level, false),
+            buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
+            input_buffer: Bytes::new(),
+            state: EncoderState::Reading,
+        }
+    }
+
+    pub fn total_in(&self) -> u64 {
+        self.compressor.total_in()
+    }
+
+    pub fn total_out(&self) -> u64 {
+        self.compressor.total_out()
+    }
+
+    pub fn into_inner(self) -> T {
+        self.inner
+    }
+
+    fn encode(
+        &mut self,
+        inbuf: &[u8],
+        flush: FlushCompress,
+    ) -> Result<(usize, flate2::Status), io::Error> {
+        let old_in = self.compressor.total_in();
+        let old_out = self.compressor.total_out();
+        let res = self
+            .compressor
+            .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
+        let new_in = (self.compressor.total_in() - old_in) as usize;
+        let new_out = (self.compressor.total_out() - old_out) as usize;
+        self.buffer.add_size(new_out);
+
+        Ok((new_in, res))
+    }
+}
+
+impl DeflateEncoder<Vec<u8>> {
+    // assume small files
+    pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
+    where
+        R: AsyncRead + Unpin,
+    {
+        let mut buffer = Vec::with_capacity(size_hint);
+        reader.read_to_end(&mut buffer).await?;
+        self.inner.reserve(size_hint); // should be enough since we want smalller files
+        self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
+        Ok(())
+    }
+}
+
+impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
+    pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
+    where
+        R: AsyncRead + Unpin,
+    {
+        let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
+        let mut eof = false;
+        loop {
+            if !eof && !buffer.is_full() {
+                let read = buffer.read_from_async(reader).await?;
+                if read == 0 {
+                    eof = true;
+                }
+            }
+            let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
+            buffer.consume(read);
+
+            self.inner.write_all(&self.buffer[..]).await?;
+            self.buffer.clear();
+
+            if buffer.is_empty() && eof {
+                break;
+            }
+        }
+
+        loop {
+            let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
+            self.inner.write_all(&self.buffer[..]).await?;
+            self.buffer.clear();
+            if res == flate2::Status::StreamEnd {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+}
+
+impl<T, O> Stream for DeflateEncoder<T>
+where
+    T: Stream<Item = Result<O, io::Error>> + Unpin,
+    O: Into<Bytes>
+{
+    type Item = Result<Bytes, io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.state {
+                EncoderState::Reading => {
+                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+                        let buf = res?;
+                        this.input_buffer = buf.into();
+                        this.state = EncoderState::Writing;
+                    } else {
+                        this.state = EncoderState::Flushing;
+                    }
+                }
+                EncoderState::Writing => {
+                    if this.input_buffer.is_empty() {
+                        return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
+                    }
+                    let mut buf = this.input_buffer.split_off(0);
+                    let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
+                    this.input_buffer = buf.split_off(read);
+                    if this.input_buffer.is_empty() {
+                        this.state = EncoderState::Reading;
+                    }
+                    if this.buffer.is_full() || res == flate2::Status::BufError {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                }
+                EncoderState::Flushing => {
+                    let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
+                    if !this.buffer.is_empty() {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                    if res == flate2::Status::StreamEnd {
+                        this.state = EncoderState::Finished;
+                    }
+                }
+                EncoderState::Finished => return Poll::Ready(None),
+            }
+        }
+    }
+}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls Dominik Csapak
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

for now we only extract 'deflate'

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/server/rest.rs | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/src/server/rest.rs b/src/server/rest.rs
index 1cd26787..02258eab 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,6 +39,7 @@ use crate::api2::types::{Authid, Userid};
 use crate::auth_helpers::*;
 use crate::config::cached_user_info::CachedUserInfo;
 use crate::tools;
+use crate::tools::compression::CompressionMethod;
 use crate::tools::FileLogger;
 
 extern "C" {
@@ -587,6 +588,21 @@ fn extract_lang_header(headers: &http::HeaderMap) -> Option<String> {
     None
 }
 
+// FIXME: support handling multiple compression methods
+fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
+    if let Some(raw_encoding) = headers.get(header::ACCEPT_ENCODING) {
+        if let Ok(encoding) = raw_encoding.to_str() {
+            for encoding in encoding.split(&[',', ' '][..]) {
+                if let Ok(method) = encoding.parse() {
+                    return Some(method);
+                }
+            }
+        }
+    }
+
+    None
+}
+
 async fn handle_request(
     api: Arc<ApiConfig>,
     req: Request<Body>,
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
                   ` (2 preceding siblings ...)
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files Dominik Csapak
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/server/rest.rs | 23 +++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git a/src/server/rest.rs b/src/server/rest.rs
index 02258eab..61513e4b 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,7 +39,7 @@ use crate::api2::types::{Authid, Userid};
 use crate::auth_helpers::*;
 use crate::config::cached_user_info::CachedUserInfo;
 use crate::tools;
-use crate::tools::compression::CompressionMethod;
+use crate::tools::compression::{CompressionMethod, DeflateEncoder, Level};
 use crate::tools::FileLogger;
 
 extern "C" {
@@ -397,6 +397,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
     uri_param: HashMap<String, String, S>,
 ) -> Result<Response<Body>, Error> {
     let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
+    let compression = extract_compression_method(&parts.headers);
 
     let result = match info.handler {
         ApiHandler::AsyncHttp(handler) => {
@@ -417,7 +418,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
         }
     };
 
-    let resp = match result {
+    let mut resp = match result {
         Ok(resp) => resp,
         Err(err) => {
             if let Some(httperr) = err.downcast_ref::<HttpError>() {
@@ -429,6 +430,24 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
         }
     };
 
+    let resp = match compression {
+        Some(CompressionMethod::Deflate) => {
+            resp.headers_mut().insert(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            resp.map(|body| {
+                Body::wrap_stream(DeflateEncoder::with_quality(
+                    body.map_err(|err| {
+                        proxmox::io_format_err!("error during compression: {}", err)
+                    }),
+                    Level::Fastest,
+                ))
+            })
+        }
+        None => resp,
+    };
+
     if info.reload_timezone {
         unsafe {
             tzset();
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
                   ` (3 preceding siblings ...)
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt Dominik Csapak
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

compress them on the fly
and refactor the size limit for chunking files

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/server/rest.rs | 84 +++++++++++++++++++++++++++++++++-------------
 1 file changed, 61 insertions(+), 23 deletions(-)

diff --git a/src/server/rest.rs b/src/server/rest.rs
index 61513e4b..07460125 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -40,6 +40,7 @@ use crate::auth_helpers::*;
 use crate::config::cached_user_info::CachedUserInfo;
 use crate::tools;
 use crate::tools::compression::{CompressionMethod, DeflateEncoder, Level};
+use crate::tools::AsyncReaderStream;
 use crate::tools::FileLogger;
 
 extern "C" {
@@ -51,6 +52,7 @@ pub struct RestServer {
 }
 
 const MAX_URI_QUERY_LENGTH: usize = 3072;
+const CHUNK_SIZE_LIMIT: u64 = 32 * 1024;
 
 impl RestServer {
     pub fn new(api_config: ApiConfig) -> Self {
@@ -544,9 +546,11 @@ fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
     ("application/octet-stream", false)
 }
 
-async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
-    let (content_type, _nocomp) = extension_to_content_type(&filename);
-
+async fn simple_static_file_download(
+    filename: PathBuf,
+    content_type: &'static str,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
     use tokio::io::AsyncReadExt;
 
     let mut file = File::open(filename)
@@ -554,46 +558,79 @@ async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>
         .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
 
     let mut data: Vec<u8> = Vec::new();
-    file.read_to_end(&mut data)
-        .await
-        .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
 
-    let mut response = Response::new(data.into());
+    let mut response = match compression {
+        Some(CompressionMethod::Deflate) => {
+            let mut enc = DeflateEncoder::with_quality(data, Level::Fastest);
+            enc.compress_vec(&mut file, CHUNK_SIZE_LIMIT as usize).await?;
+            let mut response = Response::new(enc.into_inner().into());
+            response.headers_mut().insert(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            response
+        }
+        None => {
+            file.read_to_end(&mut data)
+                .await
+                .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
+            Response::new(data.into())
+        }
+    };
+
     response.headers_mut().insert(
         header::CONTENT_TYPE,
         header::HeaderValue::from_static(content_type),
     );
+
     Ok(response)
 }
 
-async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
-    let (content_type, _nocomp) = extension_to_content_type(&filename);
+async fn chuncked_static_file_download(
+    filename: PathBuf,
+    content_type: &'static str,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+    let mut resp = Response::builder()
+        .status(StatusCode::OK)
+        .header(header::CONTENT_TYPE, content_type);
 
     let file = File::open(filename)
         .await
         .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
 
-    let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
-        .map_ok(|bytes| bytes.freeze());
-    let body = Body::wrap_stream(payload);
+    let body = match compression {
+        Some(CompressionMethod::Deflate) => {
+            resp = resp.header(
+                header::CONTENT_ENCODING,
+                CompressionMethod::Deflate.content_encoding(),
+            );
+            Body::wrap_stream(DeflateEncoder::with_quality(
+                AsyncReaderStream::new(file),
+                Level::Fastest,
+            ))
+        }
+        None => Body::wrap_stream(AsyncReaderStream::new(file)),
+    };
 
-    // FIXME: set other headers ?
-    Ok(Response::builder()
-        .status(StatusCode::OK)
-        .header(header::CONTENT_TYPE, content_type)
-        .body(body)
-        .unwrap())
+    Ok(resp.body(body).unwrap())
 }
 
-async fn handle_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
+async fn handle_static_file_download(
+    filename: PathBuf,
+    compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
     let metadata = tokio::fs::metadata(filename.clone())
         .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
         .await?;
 
-    if metadata.len() < 1024 * 32 {
-        simple_static_file_download(filename).await
+    let (content_type, nocomp) = extension_to_content_type(&filename);
+    let compression = if nocomp { None } else { compression };
+
+    if metadata.len() < CHUNK_SIZE_LIMIT {
+        simple_static_file_download(filename, content_type, compression).await
     } else {
-        chuncked_static_file_download(filename).await
+        chuncked_static_file_download(filename, content_type, compression).await
     }
 }
 
@@ -764,7 +801,8 @@ async fn handle_request(
             }
         } else {
             let filename = api.find_alias(&components);
-            return handle_static_file_download(filename).await;
+            let compression = extract_compression_method(&parts.headers);
+            return handle_static_file_download(filename, compression).await;
         }
     }
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
                   ` (4 preceding siblings ...)
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate Dominik Csapak
  2021-04-07 16:00 ` [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Thomas Lamprecht
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools/zip.rs | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/src/tools/zip.rs b/src/tools/zip.rs
index 55f2a24a..afacfbb9 100644
--- a/src/tools/zip.rs
+++ b/src/tools/zip.rs
@@ -16,8 +16,8 @@ use endian_trait::Endian;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
 
 use crc32fast::Hasher;
-use proxmox::tools::time::gmtime;
 use proxmox::tools::byte_buffer::ByteBuffer;
+use proxmox::tools::time::gmtime;
 
 const LOCAL_FH_SIG: u32 = 0x04034B50;
 const LOCAL_FF_SIG: u32 = 0x08074B50;
@@ -410,7 +410,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             byte_count: 0,
             files: Vec::new(),
             target,
-            buf: ByteBuffer::with_capacity(1024*1024),
+            buf: ByteBuffer::with_capacity(1024 * 1024),
         }
     }
 
@@ -425,7 +425,6 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             let mut hasher = Hasher::new();
             let mut size = 0;
             loop {
-
                 let count = self.buf.read_from_async(&mut content).await?;
 
                 // end of file
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
                   ` (5 preceding siblings ...)
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt Dominik Csapak
@ 2021-04-06  9:03 ` Dominik Csapak
  2021-04-07 16:00 ` [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Thomas Lamprecht
  7 siblings, 0 replies; 9+ messages in thread
From: Dominik Csapak @ 2021-04-06  9:03 UTC (permalink / raw)
  To: pbs-devel

by using our DeflateEncoder

for this to work, we have to create wrapper reader that generates the crc32
checksum while reading.

also we need to put the target writer in an Option, so that we can take
it out of self and move it into the DeflateEncoder while writing
compressed

we can drop the internal buffer then, since that is managed by the
deflate encoder now

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools/zip.rs | 132 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 94 insertions(+), 38 deletions(-)

diff --git a/src/tools/zip.rs b/src/tools/zip.rs
index afacfbb9..37483d4b 100644
--- a/src/tools/zip.rs
+++ b/src/tools/zip.rs
@@ -10,15 +10,19 @@ use std::io;
 use std::mem::size_of;
 use std::os::unix::ffi::OsStrExt;
 use std::path::{Component, Path, PathBuf};
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
-use anyhow::{Error, Result};
+use anyhow::{format_err, Error, Result};
 use endian_trait::Endian;
-use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
 
 use crc32fast::Hasher;
-use proxmox::tools::byte_buffer::ByteBuffer;
 use proxmox::tools::time::gmtime;
 
+use crate::tools::compression::{DeflateEncoder, Level};
+
 const LOCAL_FH_SIG: u32 = 0x04034B50;
 const LOCAL_FF_SIG: u32 = 0x08074B50;
 const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50;
@@ -245,7 +249,7 @@ impl ZipEntry {
                 signature: LOCAL_FH_SIG,
                 version_needed: 0x2d,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: 0,
@@ -328,7 +332,7 @@ impl ZipEntry {
                 version_made_by: VERSION_MADE_BY,
                 version_needed: VERSION_NEEDED,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: self.crc32,
@@ -366,6 +370,47 @@ impl ZipEntry {
     }
 }
 
+// wraps an asyncreader and calculates the hash
+struct HashWrapper<R> {
+    inner: R,
+    hasher: Hasher,
+}
+
+impl<R> HashWrapper<R> {
+    fn new(inner: R) -> Self {
+        Self {
+            inner,
+            hasher: Hasher::new(),
+        }
+    }
+
+    // consumes self and returns the hash and the reader
+    fn finish(self) -> (u32, R) {
+        let crc32 = self.hasher.finalize();
+        (crc32, self.inner)
+    }
+}
+
+impl<R> AsyncRead for HashWrapper<R>
+where
+    R: AsyncRead + Unpin,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        let this = self.get_mut();
+        let old_len = buf.filled().len();
+        ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
+        let new_len = buf.filled().len();
+        if new_len > old_len {
+            this.hasher.update(&buf.filled()[old_len..new_len]);
+        }
+        Poll::Ready(Ok(()))
+    }
+}
+
 /// Wraps a writer that implements AsyncWrite for creating a ZIP archive
 ///
 /// This will create a ZIP archive on the fly with files added with
@@ -400,8 +445,7 @@ where
 {
     byte_count: usize,
     files: Vec<ZipEntry>,
-    target: W,
-    buf: ByteBuffer,
+    target: Option<W>,
 }
 
 impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
@@ -409,8 +453,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         Self {
             byte_count: 0,
             files: Vec::new(),
-            target,
-            buf: ByteBuffer::with_capacity(1024 * 1024),
+            target: Some(target),
         }
     }
 
@@ -419,31 +462,31 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         mut entry: ZipEntry,
         content: Option<R>,
     ) -> Result<(), Error> {
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during add entry"))?;
         entry.offset = self.byte_count.try_into()?;
-        self.byte_count += entry.write_local_header(&mut self.target).await?;
-        if let Some(mut content) = content {
-            let mut hasher = Hasher::new();
-            let mut size = 0;
-            loop {
-                let count = self.buf.read_from_async(&mut content).await?;
-
-                // end of file
-                if count == 0 {
-                    break;
-                }
-
-                size += count;
-                hasher.update(&self.buf);
-                self.target.write_all(&self.buf).await?;
-                self.buf.consume(count);
-            }
+        self.byte_count += entry.write_local_header(&mut target).await?;
+        if let Some(content) = content {
+            let mut reader = HashWrapper::new(content);
+            let mut enc = DeflateEncoder::with_quality(target, Level::Fastest);
 
-            self.byte_count += size;
-            entry.compressed_size = size.try_into()?;
-            entry.uncompressed_size = size.try_into()?;
-            entry.crc32 = hasher.finalize();
+            enc.compress(&mut reader).await?;
+            let total_in = enc.total_in();
+            let total_out = enc.total_out();
+            target = enc.into_inner();
+
+            let (crc32, _reader) = reader.finish();
+
+            self.byte_count += total_out as usize;
+            entry.compressed_size = total_out;
+            entry.uncompressed_size = total_in;
+
+            entry.crc32 = crc32;
         }
-        self.byte_count += entry.write_data_descriptor(&mut self.target).await?;
+        self.byte_count += entry.write_data_descriptor(&mut target).await?;
+        self.target = Some(target);
 
         self.files.push(entry);
 
@@ -456,6 +499,10 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         central_dir_offset: usize,
     ) -> Result<(), Error> {
         let entrycount = self.files.len();
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during write_eocd"))?;
 
         let mut count = entrycount as u16;
         let mut directory_size = central_dir_size as u32;
@@ -470,7 +517,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             directory_offset = 0xFFFFFFFF;
 
             write_struct(
-                &mut self.target,
+                &mut target,
                 Zip64EOCDRecord {
                     signature: ZIP64_EOCD_RECORD,
                     field_size: 44,
@@ -489,7 +536,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             let locator_offset = central_dir_offset + central_dir_size;
 
             write_struct(
-                &mut self.target,
+                &mut target,
                 Zip64EOCDLocator {
                     signature: ZIP64_EOCD_LOCATOR,
                     disk_number: 0,
@@ -501,7 +548,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         }
 
         write_struct(
-            &mut self.target,
+            &mut target,
             EndOfCentralDir {
                 signature: END_OF_CENTRAL_DIR,
                 disk_number: 0,
@@ -515,23 +562,32 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         )
         .await?;
 
+        self.target = Some(target);
+
         Ok(())
     }
 
     pub async fn finish(&mut self) -> Result<(), Error> {
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during finish"))?;
         let central_dir_offset = self.byte_count;
         let mut central_dir_size = 0;
 
         for file in &self.files {
-            central_dir_size += file
-                .write_central_directory_header(&mut self.target)
-                .await?;
+            central_dir_size += file.write_central_directory_header(&mut target).await?;
         }
 
+        self.target = Some(target);
         self.write_eocd(central_dir_size, central_dir_offset)
             .await?;
 
-        self.target.flush().await?;
+        self.target
+            .take()
+            .ok_or_else(|| format_err!("had no target for flush"))?
+            .flush()
+            .await?;
 
         Ok(())
     }
-- 
2.20.1





^ permalink raw reply	[flat|nested] 9+ messages in thread

* [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip
  2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
                   ` (6 preceding siblings ...)
  2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate Dominik Csapak
@ 2021-04-07 16:00 ` Thomas Lamprecht
  7 siblings, 0 replies; 9+ messages in thread
From: Thomas Lamprecht @ 2021-04-07 16:00 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Dominik Csapak

On 06.04.21 11:03, Dominik Csapak wrote:
> this series combines my two series about adding compression to
> api calls/static as well as the zip deflate series
> 
> changes from v2:
> compression:
> * comment out non 'Deflate' CompressionMethods to have better match arms
> * use constants for 'magic' values
> * (partially) parse accept-encoding qualifiers
> zip:
> * use DeflateEncoder instead of flate2 manually
> * reorder patches so that the rustfmt patch comes first
> 
> Dominik Csapak (7):
>   tools: add compression module
>   tools/compression: add DeflateEncoder and helpers
>   server/rest: add helper to extract compression headers
>   server/rest: compress api calls
>   server/rest: compress static files
>   tools/zip: run rustfmt
>   tools/zip: compress zips with deflate
> 
>  Cargo.toml               |   1 +
>  src/server/rest.rs       | 121 ++++++++++++++++----
>  src/tools.rs             |   1 +
>  src/tools/compression.rs | 232 +++++++++++++++++++++++++++++++++++++++
>  src/tools/zip.rs         | 133 +++++++++++++++-------
>  5 files changed, 425 insertions(+), 63 deletions(-)
>  create mode 100644 src/tools/compression.rs
> 



applied series, much thanks!

As talked off-list I switched to Level::Default, while slightly slower on a
"no network bottle neck" benchmark its actually smaller and faster in total on
a real world benchmark with a throttled and latency spiked connection.
(See followup commit message for detail and benchmark results)




^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-04-07 16:00 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate Dominik Csapak
2021-04-07 16:00 ` [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Thomas Lamprecht

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal