* [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
@ 2021-04-01 14:11 ` Dominik Csapak
2021-04-02 12:07 ` Thomas Lamprecht
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers Dominik Csapak
` (3 subsequent siblings)
4 siblings, 1 reply; 10+ messages in thread
From: Dominik Csapak @ 2021-04-01 14:11 UTC (permalink / raw)
To: pbs-devel
only contains a basic enum for the different compresssion methods
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/tools.rs | 1 +
src/tools/compression.rs | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 38 insertions(+)
create mode 100644 src/tools/compression.rs
diff --git a/src/tools.rs b/src/tools.rs
index 7e3bff7b..cd1415ec 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -22,6 +22,7 @@ pub mod apt;
pub mod async_io;
pub mod borrow;
pub mod cert;
+pub mod compression;
pub mod daemon;
pub mod disks;
pub mod format;
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
new file mode 100644
index 00000000..fe15e8fc
--- /dev/null
+++ b/src/tools/compression.rs
@@ -0,0 +1,37 @@
+use anyhow::{bail, Error};
+use hyper::header;
+
+/// Possible Compression Methods, order determines preference (later is preferred)
+#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
+pub enum CompressionMethod {
+ Deflate,
+ Gzip,
+ Brotli,
+}
+
+impl CompressionMethod {
+ pub fn content_encoding(&self) -> header::HeaderValue {
+ header::HeaderValue::from_static(self.extension())
+ }
+
+ pub fn extension(&self) -> &'static str {
+ match *self {
+ CompressionMethod::Brotli => "br",
+ CompressionMethod::Gzip => "gzip",
+ CompressionMethod::Deflate => "deflate",
+ }
+ }
+}
+
+impl std::str::FromStr for CompressionMethod {
+ type Err = Error;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "br" => Ok(CompressionMethod::Brotli),
+ "gzip" => Ok(CompressionMethod::Brotli),
+ "deflate" => Ok(CompressionMethod::Deflate),
+ _ => bail!("unknown compression format"),
+ }
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module Dominik Csapak
@ 2021-04-02 12:07 ` Thomas Lamprecht
0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2021-04-02 12:07 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
On 01.04.21 16:11, Dominik Csapak wrote:
> only contains a basic enum for the different compresssion methods
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> src/tools.rs | 1 +
> src/tools/compression.rs | 37 +++++++++++++++++++++++++++++++++++++
> 2 files changed, 38 insertions(+)
> create mode 100644 src/tools/compression.rs
>
> diff --git a/src/tools.rs b/src/tools.rs
> index 7e3bff7b..cd1415ec 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -22,6 +22,7 @@ pub mod apt;
> pub mod async_io;
> pub mod borrow;
> pub mod cert;
> +pub mod compression;
> pub mod daemon;
> pub mod disks;
> pub mod format;
> diff --git a/src/tools/compression.rs b/src/tools/compression.rs
> new file mode 100644
> index 00000000..fe15e8fc
> --- /dev/null
> +++ b/src/tools/compression.rs
> @@ -0,0 +1,37 @@
> +use anyhow::{bail, Error};
> +use hyper::header;
> +
> +/// Possible Compression Methods, order determines preference (later is preferred)
> +#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
> +pub enum CompressionMethod {
> + Deflate,
> + Gzip,
> + Brotli,
> +}
> +
> +impl CompressionMethod {
> + pub fn content_encoding(&self) -> header::HeaderValue {
> + header::HeaderValue::from_static(self.extension())
> + }
> +
> + pub fn extension(&self) -> &'static str {
> + match *self {
> + CompressionMethod::Brotli => "br",
> + CompressionMethod::Gzip => "gzip",
> + CompressionMethod::Deflate => "deflate",
I'd for now comment those out, as else we have a Some(_) in most match arms meaning that
when we actually implement those there's a higher chance that we forget to add the match
to such an arm and rust won't notice due to the "catch all".
> + }
> + }
> +}
> +
> +impl std::str::FromStr for CompressionMethod {
> + type Err = Error;
> +
> + fn from_str(s: &str) -> Result<Self, Self::Err> {
> + match s {
> + "br" => Ok(CompressionMethod::Brotli),
> + "gzip" => Ok(CompressionMethod::Brotli),
above should be CompressionMethod::Gzip
> + "deflate" => Ok(CompressionMethod::Deflate),
> + _ => bail!("unknown compression format"),
> + }
> + }
> +}
>
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module Dominik Csapak
@ 2021-04-01 14:11 ` Dominik Csapak
2021-04-02 12:14 ` Thomas Lamprecht
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers Dominik Csapak
` (2 subsequent siblings)
4 siblings, 1 reply; 10+ messages in thread
From: Dominik Csapak @ 2021-04-01 14:11 UTC (permalink / raw)
To: pbs-devel
implements a deflate encoder that can compress anything that implements
AsyncRead + Unpin into a file with the helper 'compress'
if the inner type is a Stream, it implements Stream itself, this way
some streaming data can be streamed compressed
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Cargo.toml | 1 +
src/tools/compression.rs | 191 +++++++++++++++++++++++++++++++++++++++
2 files changed, 192 insertions(+)
diff --git a/Cargo.toml b/Cargo.toml
index 69b07d41..2f59b310 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,7 @@ bitflags = "1.2.1"
bytes = "1.0"
crc32fast = "1"
endian_trait = { version = "0.6", features = ["arrays"] }
+flate2 = "1.0"
anyhow = "1.0"
futures = "0.3"
h2 = { version = "0.3", features = [ "stream" ] }
diff --git a/src/tools/compression.rs b/src/tools/compression.rs
index fe15e8fc..cc6ea732 100644
--- a/src/tools/compression.rs
+++ b/src/tools/compression.rs
@@ -1,5 +1,17 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
use anyhow::{bail, Error};
+use bytes::Bytes;
+use flate2::{Compress, Compression, FlushCompress};
+use futures::ready;
+use futures::stream::Stream;
use hyper::header;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+use proxmox::io_format_err;
+use proxmox::tools::byte_buffer::ByteBuffer;
/// Possible Compression Methods, order determines preference (later is preferred)
#[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
@@ -35,3 +47,182 @@ impl std::str::FromStr for CompressionMethod {
}
}
}
+
+pub enum Level {
+ Fastest,
+ Best,
+ Default,
+ Precise(u32),
+}
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateEncoder<T> {
+ inner: T,
+ compressor: Compress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: EncoderState,
+}
+
+impl<T> DeflateEncoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::with_quality(inner, Level::Default)
+ }
+
+ pub fn with_quality(inner: T, level: Level) -> Self {
+ let level = match level {
+ Level::Fastest => Compression::fast(),
+ Level::Best => Compression::best(),
+ Level::Default => Compression::new(3),
+ Level::Precise(val) => Compression::new(val),
+ };
+
+ Self {
+ inner,
+ compressor: Compress::new(level, false),
+ buffer: ByteBuffer::with_capacity(8192),
+ input_buffer: Bytes::new(),
+ state: EncoderState::Reading,
+ }
+ }
+
+ pub fn total_in(&self) -> u64 {
+ self.compressor.total_in()
+ }
+
+ pub fn total_out(&self) -> u64 {
+ self.compressor.total_out()
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ fn encode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushCompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.compressor.total_in();
+ let old_out = self.compressor.total_out();
+ let res = self
+ .compressor
+ .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.compressor.total_in() - old_in) as usize;
+ let new_out = (self.compressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl DeflateEncoder<Vec<u8>> {
+ // assume small files
+ pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin,
+ {
+ let mut buffer = Vec::with_capacity(size_hint);
+ reader.read_to_end(&mut buffer).await?;
+ self.inner.reserve(size_hint); // should be enough since we want smalller files
+ self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
+ Ok(())
+ }
+}
+
+impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
+ pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
+ where
+ R: AsyncRead + Unpin,
+ {
+ let mut buffer = ByteBuffer::with_capacity(8192);
+ let mut eof = false;
+ loop {
+ if !eof && !buffer.is_full() {
+ let read = buffer.read_from_async(reader).await?;
+ if read == 0 {
+ eof = true;
+ }
+ }
+ let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
+ buffer.consume(read);
+
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+
+ if buffer.is_empty() && eof {
+ break;
+ }
+ }
+
+ loop {
+ let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
+ self.inner.write_all(&self.buffer[..]).await?;
+ self.buffer.clear();
+ if res == flate2::Status::StreamEnd {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl<T, O> Stream for DeflateEncoder<T>
+where
+ T: Stream<Item = Result<O, io::Error>> + Unpin,
+ O: Into<Bytes>
+{
+ type Item = Result<Bytes, io::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ EncoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res?;
+ this.input_buffer = buf.into();
+ this.state = EncoderState::Writing;
+ } else {
+ this.state = EncoderState::Flushing;
+ }
+ }
+ EncoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = EncoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ EncoderState::Flushing => {
+ let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = EncoderState::Finished;
+ }
+ }
+ EncoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers Dominik Csapak
@ 2021-04-02 12:14 ` Thomas Lamprecht
0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2021-04-02 12:14 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
On 01.04.21 16:11, Dominik Csapak wrote:
> implements a deflate encoder that can compress anything that implements
> AsyncRead + Unpin into a file with the helper 'compress'
>
> if the inner type is a Stream, it implements Stream itself, this way
> some streaming data can be streamed compressed
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> Cargo.toml | 1 +
> src/tools/compression.rs | 191 +++++++++++++++++++++++++++++++++++++++
> 2 files changed, 192 insertions(+)
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 69b07d41..2f59b310 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -29,6 +29,7 @@ bitflags = "1.2.1"
> bytes = "1.0"
> crc32fast = "1"
> endian_trait = { version = "0.6", features = ["arrays"] }
> +flate2 = "1.0"
> anyhow = "1.0"
> futures = "0.3"
> h2 = { version = "0.3", features = [ "stream" ] }
> diff --git a/src/tools/compression.rs b/src/tools/compression.rs
> index fe15e8fc..cc6ea732 100644
> --- a/src/tools/compression.rs
> +++ b/src/tools/compression.rs
> @@ -1,5 +1,17 @@
> +use std::io;
> +use std::pin::Pin;
> +use std::task::{Context, Poll};
> +
> use anyhow::{bail, Error};
> +use bytes::Bytes;
> +use flate2::{Compress, Compression, FlushCompress};
> +use futures::ready;
> +use futures::stream::Stream;
> use hyper::header;
> +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::byte_buffer::ByteBuffer;
>
> /// Possible Compression Methods, order determines preference (later is preferred)
> #[derive(Eq, Ord, PartialEq, PartialOrd, Debug)]
> @@ -35,3 +47,182 @@ impl std::str::FromStr for CompressionMethod {
> }
> }
> }
> +
> +pub enum Level {
> + Fastest,
> + Best,
> + Default,
> + Precise(u32),
> +}
> +
> +#[derive(Eq, PartialEq)]
> +enum EncoderState {
> + Reading,
> + Writing,
> + Flushing,
> + Finished,
> +}
> +
> +pub struct DeflateEncoder<T> {
> + inner: T,
> + compressor: Compress,
> + buffer: ByteBuffer,
> + input_buffer: Bytes,
> + state: EncoderState,
> +}
> +
> +impl<T> DeflateEncoder<T> {
> + pub fn new(inner: T) -> Self {
> + Self::with_quality(inner, Level::Default)
> + }
> +
> + pub fn with_quality(inner: T, level: Level) -> Self {
> + let level = match level {
> + Level::Fastest => Compression::fast(),
> + Level::Best => Compression::best(),
> + Level::Default => Compression::new(3),
> + Level::Precise(val) => Compression::new(val),
> + };
> +
> + Self {
> + inner,
> + compressor: Compress::new(level, false),
> + buffer: ByteBuffer::with_capacity(8192),
maybe we could use a const for this buffer size
> + input_buffer: Bytes::new(),
> + state: EncoderState::Reading,
> + }
> + }
> +
> + pub fn total_in(&self) -> u64 {
does this need to be publicly visible? At least now its only used in the
private encode function below. Really no hard feelings about this, just noticed.
> + self.compressor.total_in()
> + }
> +
> + pub fn total_out(&self) -> u64 {
same here
> + self.compressor.total_out()
> + }
> +
> + pub fn into_inner(self) -> T {
> + self.inner
> + }
> +
> + fn encode(
> + &mut self,
> + inbuf: &[u8],
> + flush: FlushCompress,
> + ) -> Result<(usize, flate2::Status), io::Error> {
> + let old_in = self.compressor.total_in();
> + let old_out = self.compressor.total_out();
> + let res = self
> + .compressor
> + .compress(&inbuf[..], self.buffer.get_free_mut_slice(), flush)?;
> + let new_in = (self.compressor.total_in() - old_in) as usize;
> + let new_out = (self.compressor.total_out() - old_out) as usize;
> + self.buffer.add_size(new_out);
> +
> + Ok((new_in, res))
> + }
> +}
> +
> +impl DeflateEncoder<Vec<u8>> {
> + // assume small files
> + pub async fn compress_vec<R>(&mut self, reader: &mut R, size_hint: usize) -> Result<(), Error>
> + where
> + R: AsyncRead + Unpin,
> + {
> + let mut buffer = Vec::with_capacity(size_hint);
> + reader.read_to_end(&mut buffer).await?;
> + self.inner.reserve(size_hint); // should be enough since we want smalller files
> + self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
> + Ok(())
> + }
> +}
> +
> +impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
> + pub async fn compress<R>(&mut self, reader: &mut R) -> Result<(), Error>
> + where
> + R: AsyncRead + Unpin,
> + {
> + let mut buffer = ByteBuffer::with_capacity(8192);
and reuse the const buffer size here..
> + let mut eof = false;
> + loop {
> + if !eof && !buffer.is_full() {
> + let read = buffer.read_from_async(reader).await?;
> + if read == 0 {
> + eof = true;
> + }
> + }
> + let (read, _res) = self.encode(&buffer[..], FlushCompress::None)?;
> + buffer.consume(read);
> +
> + self.inner.write_all(&self.buffer[..]).await?;
> + self.buffer.clear();
> +
> + if buffer.is_empty() && eof {
> + break;
> + }
> + }
> +
> + loop {
> + let (_read, res) = self.encode(&[][..], FlushCompress::Finish)?;
> + self.inner.write_all(&self.buffer[..]).await?;
> + self.buffer.clear();
> + if res == flate2::Status::StreamEnd {
> + break;
> + }
> + }
> +
> + Ok(())
> + }
> +}
> +
> +impl<T, O> Stream for DeflateEncoder<T>
> +where
> + T: Stream<Item = Result<O, io::Error>> + Unpin,
> + O: Into<Bytes>
> +{
> + type Item = Result<Bytes, io::Error>;
> +
> + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
> + let this = self.get_mut();
> +
> + loop {
> + match this.state {
> + EncoderState::Reading => {
> + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
> + let buf = res?;
> + this.input_buffer = buf.into();
> + this.state = EncoderState::Writing;
> + } else {
> + this.state = EncoderState::Flushing;
> + }
> + }
> + EncoderState::Writing => {
> + if this.input_buffer.is_empty() {
> + return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
> + }
> + let mut buf = this.input_buffer.split_off(0);
> + let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
> + this.input_buffer = buf.split_off(read);
> + if this.input_buffer.is_empty() {
> + this.state = EncoderState::Reading;
> + }
> + if this.buffer.is_full() || res == flate2::Status::BufError {
> + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> + return Poll::Ready(Some(Ok(bytes.into())));
> + }
> + }
> + EncoderState::Flushing => {
> + let (_read, res) = this.encode(&[][..], FlushCompress::Finish)?;
> + if !this.buffer.is_empty() {
> + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
> + return Poll::Ready(Some(Ok(bytes.into())));
> + }
> + if res == flate2::Status::StreamEnd {
> + this.state = EncoderState::Finished;
> + }
> + }
> + EncoderState::Finished => return Poll::Ready(None),
> + }
> + }
> + }
> +}
>
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] tools: add compression module Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] tools/compression: add DeflateEncoder and helpers Dominik Csapak
@ 2021-04-01 14:11 ` Dominik Csapak
2021-04-02 12:20 ` Thomas Lamprecht
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] server/rest: compress api calls Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files Dominik Csapak
4 siblings, 1 reply; 10+ messages in thread
From: Dominik Csapak @ 2021-04-01 14:11 UTC (permalink / raw)
To: pbs-devel
for now we only extract 'deflate'
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 23 +++++++++++++++++++++++
1 file changed, 23 insertions(+)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 1cd26787..00ff6844 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,6 +39,7 @@ use crate::api2::types::{Authid, Userid};
use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
+use crate::tools::compression::CompressionMethod;
use crate::tools::FileLogger;
extern "C" {
@@ -587,6 +588,28 @@ fn extract_lang_header(headers: &http::HeaderMap) -> Option<String> {
None
}
+fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
+ let mut compression = None;
+ if let Some(raw_encoding) = headers.get(header::ACCEPT_ENCODING) {
+ if let Ok(encoding) = raw_encoding.to_str() {
+ for encoding in encoding.split(&[',', ' '][..]) {
+ if let Ok(method) = encoding.parse() {
+ if method != CompressionMethod::Deflate {
+ // fixme: implement other compressors
+ continue;
+ }
+ let method = Some(method);
+ if method > compression {
+ compression = method;
+ }
+ }
+ }
+ }
+ }
+
+ return compression;
+}
+
async fn handle_request(
api: Arc<ApiConfig>,
req: Request<Body>,
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers Dominik Csapak
@ 2021-04-02 12:20 ` Thomas Lamprecht
0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2021-04-02 12:20 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
On 01.04.21 16:11, Dominik Csapak wrote:
> for now we only extract 'deflate'
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> src/server/rest.rs | 23 +++++++++++++++++++++++
> 1 file changed, 23 insertions(+)
>
> diff --git a/src/server/rest.rs b/src/server/rest.rs
> index 1cd26787..00ff6844 100644
> --- a/src/server/rest.rs
> +++ b/src/server/rest.rs
> @@ -39,6 +39,7 @@ use crate::api2::types::{Authid, Userid};
> use crate::auth_helpers::*;
> use crate::config::cached_user_info::CachedUserInfo;
> use crate::tools;
> +use crate::tools::compression::CompressionMethod;
> use crate::tools::FileLogger;
>
> extern "C" {
> @@ -587,6 +588,28 @@ fn extract_lang_header(headers: &http::HeaderMap) -> Option<String> {
> None
> }
>
> +fn extract_compression_method(headers: &http::HeaderMap) -> Option<CompressionMethod> {
> + let mut compression = None;
> + if let Some(raw_encoding) = headers.get(header::ACCEPT_ENCODING) {
> + if let Ok(encoding) = raw_encoding.to_str() {
FWIW, above two lines could be merged with:
if let Some(Ok(encodings)) = headers.get(header::ACCEPT_ENCODING).map(|v| v.to_str()) {
Would reduce a level of indentation, which this fn has quite a few.
> + for encoding in encoding.split(&[',', ' '][..]) {
as mentioned off-list, the Accept-Encoding allows to add "Quality values" [2], like
`deflate;q=<weight>`[1], to encodings, I haven't seen any browser actually using it
but it is allowed in the RFC[0].
I'd not implement support for it, but we could either adapt the FromStr implementation
or split also by semicolon here.
[0]: https://tools.ietf.org/html/rfc7231#section-5.3.4
[1]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding#directives
[2]: https://developer.mozilla.org/en-US/docs/Glossary/Quality_values
> + if let Ok(method) = encoding.parse() {
> + if method != CompressionMethod::Deflate {
> + // fixme: implement other compressors
> + continue;
> + }
> + let method = Some(method);
> + if method > compression {
> + compression = method;
> + }
> + }
> + }
> + }
> + }
> +
> + return compression;
> +}
> +
> async fn handle_request(
> api: Arc<ApiConfig>,
> req: Request<Body>,
>
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 4/5] server/rest: compress api calls
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
` (2 preceding siblings ...)
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] server/rest: add helper to extract compression headers Dominik Csapak
@ 2021-04-01 14:11 ` Dominik Csapak
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files Dominik Csapak
4 siblings, 0 replies; 10+ messages in thread
From: Dominik Csapak @ 2021-04-01 14:11 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 25 +++++++++++++++++++++++--
1 file changed, 23 insertions(+), 2 deletions(-)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 00ff6844..357d1b81 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -39,7 +39,7 @@ use crate::api2::types::{Authid, Userid};
use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
-use crate::tools::compression::CompressionMethod;
+use crate::tools::compression::{CompressionMethod, DeflateEncoder, Level};
use crate::tools::FileLogger;
extern "C" {
@@ -397,6 +397,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
uri_param: HashMap<String, String, S>,
) -> Result<Response<Body>, Error> {
let delay_unauth_time = std::time::Instant::now() + std::time::Duration::from_millis(3000);
+ let compression = extract_compression_method(&parts.headers);
let result = match info.handler {
ApiHandler::AsyncHttp(handler) => {
@@ -417,7 +418,7 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
}
};
- let resp = match result {
+ let mut resp = match result {
Ok(resp) => resp,
Err(err) => {
if let Some(httperr) = err.downcast_ref::<HttpError>() {
@@ -429,6 +430,26 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
}
};
+ let resp = match compression {
+ Some(CompressionMethod::Deflate) => {
+ resp.headers_mut()
+ .insert(header::CONTENT_ENCODING, CompressionMethod::Deflate.content_encoding());
+ resp.map(|body|
+ Body::wrap_stream(DeflateEncoder::with_quality(
+ body.map_err(|err| {
+ proxmox::io_format_err!("error during compression: {}", err)
+ }),
+ Level::Fastest,
+ )),
+ )
+ }
+ Some(_other) => {
+ // fixme: implement other compression algorithms
+ resp
+ }
+ None => resp,
+ };
+
if info.reload_timezone {
unsafe {
tzset();
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files
2021-04-01 14:11 [pbs-devel] [PATCH proxmox-backup v2 0/5] add compression to api/static files Dominik Csapak
` (3 preceding siblings ...)
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] server/rest: compress api calls Dominik Csapak
@ 2021-04-01 14:11 ` Dominik Csapak
2021-04-02 12:32 ` Thomas Lamprecht
4 siblings, 1 reply; 10+ messages in thread
From: Dominik Csapak @ 2021-04-01 14:11 UTC (permalink / raw)
To: pbs-devel
compress them on the fly
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
src/server/rest.rs | 93 ++++++++++++++++++++++++++++++++--------------
1 file changed, 66 insertions(+), 27 deletions(-)
diff --git a/src/server/rest.rs b/src/server/rest.rs
index 357d1b81..6b1bb0cb 100644
--- a/src/server/rest.rs
+++ b/src/server/rest.rs
@@ -40,6 +40,7 @@ use crate::auth_helpers::*;
use crate::config::cached_user_info::CachedUserInfo;
use crate::tools;
use crate::tools::compression::{CompressionMethod, DeflateEncoder, Level};
+use crate::tools::AsyncReaderStream;
use crate::tools::FileLogger;
extern "C" {
@@ -432,16 +433,18 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
let resp = match compression {
Some(CompressionMethod::Deflate) => {
- resp.headers_mut()
- .insert(header::CONTENT_ENCODING, CompressionMethod::Deflate.content_encoding());
- resp.map(|body|
+ resp.headers_mut().insert(
+ header::CONTENT_ENCODING,
+ CompressionMethod::Deflate.content_encoding(),
+ );
+ resp.map(|body| {
Body::wrap_stream(DeflateEncoder::with_quality(
body.map_err(|err| {
proxmox::io_format_err!("error during compression: {}", err)
}),
Level::Fastest,
- )),
- )
+ ))
+ })
}
Some(_other) => {
// fixme: implement other compression algorithms
@@ -546,9 +549,11 @@ fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
("application/octet-stream", false)
}
-async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
- let (content_type, _nocomp) = extension_to_content_type(&filename);
-
+async fn simple_static_file_download(
+ filename: PathBuf,
+ content_type: &'static str,
+ compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
use tokio::io::AsyncReadExt;
let mut file = File::open(filename)
@@ -556,46 +561,79 @@ async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>
.map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
let mut data: Vec<u8> = Vec::new();
- file.read_to_end(&mut data)
- .await
- .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
- let mut response = Response::new(data.into());
+ let mut response = match compression {
+ Some(CompressionMethod::Deflate) => {
+ let mut enc = DeflateEncoder::with_quality(data, Level::Fastest);
+ enc.compress_vec(&mut file, 32 * 1024).await?;
+ let mut response = Response::new(enc.into_inner().into());
+ response.headers_mut().insert(
+ header::CONTENT_ENCODING,
+ CompressionMethod::Deflate.content_encoding(),
+ );
+ response
+ }
+ Some(_) | None => {
+ file.read_to_end(&mut data)
+ .await
+ .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
+ Response::new(data.into())
+ }
+ };
+
response.headers_mut().insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static(content_type),
);
+
Ok(response)
}
-async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
- let (content_type, _nocomp) = extension_to_content_type(&filename);
+async fn chuncked_static_file_download(
+ filename: PathBuf,
+ content_type: &'static str,
+ compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
+ let mut resp = Response::builder()
+ .status(StatusCode::OK)
+ .header(header::CONTENT_TYPE, content_type);
let file = File::open(filename)
.await
.map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
- let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
- .map_ok(|bytes| bytes.freeze());
- let body = Body::wrap_stream(payload);
+ let body = match compression {
+ Some(CompressionMethod::Deflate) => {
+ resp = resp.header(
+ header::CONTENT_ENCODING,
+ CompressionMethod::Deflate.content_encoding(),
+ );
+ Body::wrap_stream(DeflateEncoder::with_quality(
+ AsyncReaderStream::new(file),
+ Level::Fastest,
+ ))
+ }
+ Some(_) | None => Body::wrap_stream(AsyncReaderStream::new(file)),
+ };
- // FIXME: set other headers ?
- Ok(Response::builder()
- .status(StatusCode::OK)
- .header(header::CONTENT_TYPE, content_type)
- .body(body)
- .unwrap())
+ Ok(resp.body(body).unwrap())
}
-async fn handle_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
+async fn handle_static_file_download(
+ filename: PathBuf,
+ compression: Option<CompressionMethod>,
+) -> Result<Response<Body>, Error> {
let metadata = tokio::fs::metadata(filename.clone())
.map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
.await?;
+ let (content_type, nocomp) = extension_to_content_type(&filename);
+ let compression = if nocomp { None } else { compression };
+
if metadata.len() < 1024 * 32 {
- simple_static_file_download(filename).await
+ simple_static_file_download(filename, content_type, compression).await
} else {
- chuncked_static_file_download(filename).await
+ chuncked_static_file_download(filename, content_type, compression).await
}
}
@@ -773,7 +811,8 @@ async fn handle_request(
}
} else {
let filename = api.find_alias(&components);
- return handle_static_file_download(filename).await;
+ let compression = extract_compression_method(&parts.headers);
+ return handle_static_file_download(filename, compression).await;
}
}
--
2.20.1
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files
2021-04-01 14:11 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] server/rest: compress static files Dominik Csapak
@ 2021-04-02 12:32 ` Thomas Lamprecht
0 siblings, 0 replies; 10+ messages in thread
From: Thomas Lamprecht @ 2021-04-02 12:32 UTC (permalink / raw)
To: Proxmox Backup Server development discussion, Dominik Csapak
On 01.04.21 16:11, Dominik Csapak wrote:
> compress them on the fly
>
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
> src/server/rest.rs | 93 ++++++++++++++++++++++++++++++++--------------
> 1 file changed, 66 insertions(+), 27 deletions(-)
>
> diff --git a/src/server/rest.rs b/src/server/rest.rs
> index 357d1b81..6b1bb0cb 100644
> --- a/src/server/rest.rs
> +++ b/src/server/rest.rs
> @@ -40,6 +40,7 @@ use crate::auth_helpers::*;
> use crate::config::cached_user_info::CachedUserInfo;
> use crate::tools;
> use crate::tools::compression::{CompressionMethod, DeflateEncoder, Level};
> +use crate::tools::AsyncReaderStream;
> use crate::tools::FileLogger;
>
> extern "C" {
> @@ -432,16 +433,18 @@ pub async fn handle_api_request<Env: RpcEnvironment, S: 'static + BuildHasher +
>
> let resp = match compression {
> Some(CompressionMethod::Deflate) => {
> - resp.headers_mut()
> - .insert(header::CONTENT_ENCODING, CompressionMethod::Deflate.content_encoding());
> - resp.map(|body|
> + resp.headers_mut().insert(
> + header::CONTENT_ENCODING,
> + CompressionMethod::Deflate.content_encoding(),
> + );
> + resp.map(|body| {
> Body::wrap_stream(DeflateEncoder::with_quality(
> body.map_err(|err| {
> proxmox::io_format_err!("error during compression: {}", err)
> }),
> Level::Fastest,
> - )),
> - )
> + ))
> + })
> }
> Some(_other) => {
> // fixme: implement other compression algorithms
> @@ -546,9 +549,11 @@ fn extension_to_content_type(filename: &Path) -> (&'static str, bool) {
> ("application/octet-stream", false)
> }
>
> -async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
> - let (content_type, _nocomp) = extension_to_content_type(&filename);
> -
> +async fn simple_static_file_download(
> + filename: PathBuf,
> + content_type: &'static str,
> + compression: Option<CompressionMethod>,
> +) -> Result<Response<Body>, Error> {
> use tokio::io::AsyncReadExt;
>
> let mut file = File::open(filename)
> @@ -556,46 +561,79 @@ async fn simple_static_file_download(filename: PathBuf) -> Result<Response<Body>
> .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
>
> let mut data: Vec<u8> = Vec::new();
> - file.read_to_end(&mut data)
> - .await
> - .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
>
> - let mut response = Response::new(data.into());
> + let mut response = match compression {
> + Some(CompressionMethod::Deflate) => {
> + let mut enc = DeflateEncoder::with_quality(data, Level::Fastest);
> + enc.compress_vec(&mut file, 32 * 1024).await?;
as talked off-list, that value should really be in a `const CHUNK_RESPONSE_LIMIT` or the like.
> + let mut response = Response::new(enc.into_inner().into());
> + response.headers_mut().insert(
> + header::CONTENT_ENCODING,
> + CompressionMethod::Deflate.content_encoding(),
> + );
> + response
> + }
> + Some(_) | None => {
> + file.read_to_end(&mut data)
> + .await
> + .map_err(|err| http_err!(BAD_REQUEST, "File read failed: {}", err))?;
> + Response::new(data.into())
> + }
> + };
> +
> response.headers_mut().insert(
> header::CONTENT_TYPE,
> header::HeaderValue::from_static(content_type),
> );
> +
> Ok(response)
> }
>
> -async fn chuncked_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
> - let (content_type, _nocomp) = extension_to_content_type(&filename);
> +async fn chuncked_static_file_download(
> + filename: PathBuf,
> + content_type: &'static str,
> + compression: Option<CompressionMethod>,
> +) -> Result<Response<Body>, Error> {
> + let mut resp = Response::builder()
> + .status(StatusCode::OK)
> + .header(header::CONTENT_TYPE, content_type);
>
> let file = File::open(filename)
> .await
> .map_err(|err| http_err!(BAD_REQUEST, "File open failed: {}", err))?;
>
> - let payload = tokio_util::codec::FramedRead::new(file, tokio_util::codec::BytesCodec::new())
> - .map_ok(|bytes| bytes.freeze());
> - let body = Body::wrap_stream(payload);
> + let body = match compression {
> + Some(CompressionMethod::Deflate) => {
> + resp = resp.header(
> + header::CONTENT_ENCODING,
> + CompressionMethod::Deflate.content_encoding(),
> + );
> + Body::wrap_stream(DeflateEncoder::with_quality(
> + AsyncReaderStream::new(file),
> + Level::Fastest,
> + ))
> + }
> + Some(_) | None => Body::wrap_stream(AsyncReaderStream::new(file)),
> + };
>
> - // FIXME: set other headers ?
> - Ok(Response::builder()
> - .status(StatusCode::OK)
> - .header(header::CONTENT_TYPE, content_type)
> - .body(body)
> - .unwrap())
> + Ok(resp.body(body).unwrap())
> }
>
> -async fn handle_static_file_download(filename: PathBuf) -> Result<Response<Body>, Error> {
> +async fn handle_static_file_download(
> + filename: PathBuf,
> + compression: Option<CompressionMethod>,
> +) -> Result<Response<Body>, Error> {
> let metadata = tokio::fs::metadata(filename.clone())
> .map_err(|err| http_err!(BAD_REQUEST, "File access problems: {}", err))
> .await?;
>
> + let (content_type, nocomp) = extension_to_content_type(&filename);
> + let compression = if nocomp { None } else { compression };
> +
> if metadata.len() < 1024 * 32 {
the const from above could then be used here too
> - simple_static_file_download(filename).await
> + simple_static_file_download(filename, content_type, compression).await
> } else {
> - chuncked_static_file_download(filename).await
> + chuncked_static_file_download(filename, content_type, compression).await
> }
> }
>
> @@ -773,7 +811,8 @@ async fn handle_request(
> }
> } else {
> let filename = api.find_alias(&components);
> - return handle_static_file_download(filename).await;
> + let compression = extract_compression_method(&parts.headers);
> + return handle_static_file_download(filename, compression).await;
> }
> }
>
>
^ permalink raw reply [flat|nested] 10+ messages in thread