* [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod
2024-06-26 14:57 [pbs-devel] [PATCH promox v4 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
@ 2024-06-26 14:57 ` Maximiliano Sandoval
2024-07-03 13:57 ` Max Carrara
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 2/5] compression: deflate: add builder pattern Maximiliano Sandoval
` (3 subsequent siblings)
4 siblings, 1 reply; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-06-26 14:57 UTC (permalink / raw)
To: pbs-devel
This allows to add a decompression mod inside the deflate mod. This does
not touch the public API.
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
proxmox-compression/src/{ => deflate}/compression.rs | 6 ++----
proxmox-compression/src/deflate/mod.rs | 5 +++++
proxmox-compression/src/lib.rs | 4 ++--
proxmox-compression/src/zip.rs | 2 +-
4 files changed, 10 insertions(+), 7 deletions(-)
rename proxmox-compression/src/{ => deflate}/compression.rs (97%)
create mode 100644 proxmox-compression/src/deflate/mod.rs
diff --git a/proxmox-compression/src/compression.rs b/proxmox-compression/src/deflate/compression.rs
similarity index 97%
rename from proxmox-compression/src/compression.rs
rename to proxmox-compression/src/deflate/compression.rs
index 632a5991..6e6a151d 100644
--- a/proxmox-compression/src/compression.rs
+++ b/proxmox-compression/src/deflate/compression.rs
@@ -12,8 +12,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use proxmox_io::ByteBuffer;
use proxmox_lang::io_format_err;
-const BUFFER_SIZE: usize = 8192;
-
pub enum Level {
Fastest,
Best,
@@ -53,7 +51,7 @@ impl<T> DeflateEncoder<T> {
Self {
inner,
compressor: Compress::new(level, false),
- buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
+ buffer: ByteBuffer::with_capacity(super::BUFFER_SIZE),
input_buffer: Bytes::new(),
state: EncoderState::Reading,
}
@@ -109,7 +107,7 @@ impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
where
R: AsyncRead + Unpin,
{
- let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
+ let mut buffer = ByteBuffer::with_capacity(super::BUFFER_SIZE);
let mut eof = false;
loop {
if !eof && !buffer.is_full() {
diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
new file mode 100644
index 00000000..514ccbdc
--- /dev/null
+++ b/proxmox-compression/src/deflate/mod.rs
@@ -0,0 +1,5 @@
+mod compression;
+
+pub use compression::{DeflateEncoder, Level};
+
+const BUFFER_SIZE: usize = 8192;
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 1fcfb977..70d88afe 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,6 +1,6 @@
-mod compression;
-pub use compression::*;
+pub use deflate::{DeflateEncoder, Level};
+mod deflate;
pub mod tar;
pub mod zip;
pub mod zstd;
diff --git a/proxmox-compression/src/zip.rs b/proxmox-compression/src/zip.rs
index d2d3fd80..3ccece9b 100644
--- a/proxmox-compression/src/zip.rs
+++ b/proxmox-compression/src/zip.rs
@@ -22,7 +22,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use crc32fast::Hasher;
use proxmox_time::gmtime;
-use crate::compression::{DeflateEncoder, Level};
+use crate::deflate::{DeflateEncoder, Level};
const LOCAL_FH_SIG: u32 = 0x04034B50;
const LOCAL_FF_SIG: u32 = 0x08074B50;
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod Maximiliano Sandoval
@ 2024-07-03 13:57 ` Max Carrara
2024-07-05 13:01 ` Maximiliano Sandoval
0 siblings, 1 reply; 8+ messages in thread
From: Max Carrara @ 2024-07-03 13:57 UTC (permalink / raw)
To: Proxmox Backup Server development discussion
On Wed Jun 26, 2024 at 4:57 PM CEST, Maximiliano Sandoval wrote:
> This allows to add a decompression mod inside the deflate mod. This does
> not touch the public API.
>
> Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
> ---
This patch unfortunately doesn't apply for me. I'm on the latest master.
Could you please rebase this? Thanks!
> proxmox-compression/src/{ => deflate}/compression.rs | 6 ++----
> proxmox-compression/src/deflate/mod.rs | 5 +++++
> proxmox-compression/src/lib.rs | 4 ++--
> proxmox-compression/src/zip.rs | 2 +-
> 4 files changed, 10 insertions(+), 7 deletions(-)
> rename proxmox-compression/src/{ => deflate}/compression.rs (97%)
> create mode 100644 proxmox-compression/src/deflate/mod.rs
>
> diff --git a/proxmox-compression/src/compression.rs b/proxmox-compression/src/deflate/compression.rs
> similarity index 97%
> rename from proxmox-compression/src/compression.rs
> rename to proxmox-compression/src/deflate/compression.rs
> index 632a5991..6e6a151d 100644
> --- a/proxmox-compression/src/compression.rs
> +++ b/proxmox-compression/src/deflate/compression.rs
> @@ -12,8 +12,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
> use proxmox_io::ByteBuffer;
> use proxmox_lang::io_format_err;
>
> -const BUFFER_SIZE: usize = 8192;
> -
> pub enum Level {
> Fastest,
> Best,
> @@ -53,7 +51,7 @@ impl<T> DeflateEncoder<T> {
> Self {
> inner,
> compressor: Compress::new(level, false),
> - buffer: ByteBuffer::with_capacity(BUFFER_SIZE),
> + buffer: ByteBuffer::with_capacity(super::BUFFER_SIZE),
> input_buffer: Bytes::new(),
> state: EncoderState::Reading,
> }
> @@ -109,7 +107,7 @@ impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
> where
> R: AsyncRead + Unpin,
> {
> - let mut buffer = ByteBuffer::with_capacity(BUFFER_SIZE);
> + let mut buffer = ByteBuffer::with_capacity(super::BUFFER_SIZE);
> let mut eof = false;
> loop {
> if !eof && !buffer.is_full() {
> diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
> new file mode 100644
> index 00000000..514ccbdc
> --- /dev/null
> +++ b/proxmox-compression/src/deflate/mod.rs
> @@ -0,0 +1,5 @@
> +mod compression;
> +
> +pub use compression::{DeflateEncoder, Level};
> +
> +const BUFFER_SIZE: usize = 8192;
> diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
> index 1fcfb977..70d88afe 100644
> --- a/proxmox-compression/src/lib.rs
> +++ b/proxmox-compression/src/lib.rs
> @@ -1,6 +1,6 @@
> -mod compression;
> -pub use compression::*;
> +pub use deflate::{DeflateEncoder, Level};
>
> +mod deflate;
> pub mod tar;
> pub mod zip;
> pub mod zstd;
> diff --git a/proxmox-compression/src/zip.rs b/proxmox-compression/src/zip.rs
> index d2d3fd80..3ccece9b 100644
> --- a/proxmox-compression/src/zip.rs
> +++ b/proxmox-compression/src/zip.rs
> @@ -22,7 +22,7 @@ use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
> use crc32fast::Hasher;
> use proxmox_time::gmtime;
>
> -use crate::compression::{DeflateEncoder, Level};
> +use crate::deflate::{DeflateEncoder, Level};
>
> const LOCAL_FH_SIG: u32 = 0x04034B50;
> const LOCAL_FF_SIG: u32 = 0x08074B50;
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod
2024-07-03 13:57 ` Max Carrara
@ 2024-07-05 13:01 ` Maximiliano Sandoval
0 siblings, 0 replies; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-07-05 13:01 UTC (permalink / raw)
To: Max Carrara; +Cc: Proxmox Backup Server development discussion
"Max Carrara" <m.carrara@proxmox.com> writes:
> On Wed Jun 26, 2024 at 4:57 PM CEST, Maximiliano Sandoval wrote:
>> This allows to add a decompression mod inside the deflate mod. This does
>> not touch the public API.
>>
>> Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
>> ---
>
> This patch unfortunately doesn't apply for me. I'm on the latest master.
> Could you please rebase this? Thanks!
I will post v5 in a bit.
--
Maximiliano
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH promox v4 2/5] compression: deflate: add builder pattern
2024-06-26 14:57 [pbs-devel] [PATCH promox v4 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod Maximiliano Sandoval
@ 2024-06-26 14:57 ` Maximiliano Sandoval
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 3/5] compression: deflate: add a decoder Maximiliano Sandoval
` (2 subsequent siblings)
4 siblings, 0 replies; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-06-26 14:57 UTC (permalink / raw)
To: pbs-devel
This allows creating a encoder in a more general way and allows to
specify whether we want to set zlib headers. This is useful to compress
HTTP traffic, as per [1].
[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#directives
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
.../src/deflate/compression.rs | 55 ++++++++++++++++---
1 file changed, 46 insertions(+), 9 deletions(-)
diff --git a/proxmox-compression/src/deflate/compression.rs b/proxmox-compression/src/deflate/compression.rs
index 6e6a151d..24d2bc99 100644
--- a/proxmox-compression/src/deflate/compression.rs
+++ b/proxmox-compression/src/deflate/compression.rs
@@ -35,27 +35,64 @@ pub struct DeflateEncoder<T> {
state: EncoderState,
}
-impl<T> DeflateEncoder<T> {
- pub fn new(inner: T) -> Self {
- Self::with_quality(inner, Level::Default)
+pub struct DeflateEncoderBuilder<T> {
+ inner: T,
+ is_zlib: bool,
+ buffer_size: usize,
+ level: Level,
+}
+
+impl<T> DeflateEncoderBuilder<T> {
+ pub fn zlib(mut self, is_zlib: bool) -> Self {
+ self.is_zlib = is_zlib;
+ self
}
- pub fn with_quality(inner: T, level: Level) -> Self {
- let level = match level {
+ pub fn level(mut self, level: Level) -> Self {
+ self.level = level;
+ self
+ }
+
+ pub fn buffer_size(mut self, buffer_size: usize) -> Self {
+ self.buffer_size = buffer_size;
+ self
+ }
+
+ pub fn build(self) -> DeflateEncoder<T> {
+ let level = match self.level {
Level::Fastest => Compression::fast(),
Level::Best => Compression::best(),
Level::Default => Compression::new(3),
Level::Precise(val) => Compression::new(val),
};
- Self {
- inner,
- compressor: Compress::new(level, false),
- buffer: ByteBuffer::with_capacity(super::BUFFER_SIZE),
+ DeflateEncoder {
+ inner: self.inner,
+ compressor: Compress::new(level, self.is_zlib),
+ buffer: ByteBuffer::with_capacity(self.buffer_size),
input_buffer: Bytes::new(),
state: EncoderState::Reading,
}
}
+}
+
+impl<T> DeflateEncoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::builder(inner).build()
+ }
+
+ pub fn builder(inner: T) -> DeflateEncoderBuilder<T> {
+ DeflateEncoderBuilder {
+ inner,
+ is_zlib: false,
+ buffer_size: super::BUFFER_SIZE,
+ level: Level::Default,
+ }
+ }
+
+ pub fn with_quality(inner: T, level: Level) -> Self {
+ Self::builder(inner).level(level).build()
+ }
pub fn total_in(&self) -> u64 {
self.compressor.total_in()
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH promox v4 3/5] compression: deflate: add a decoder
2024-06-26 14:57 [pbs-devel] [PATCH promox v4 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 1/5] compression: deflate: move encoder into a mod Maximiliano Sandoval
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 2/5] compression: deflate: add builder pattern Maximiliano Sandoval
@ 2024-06-26 14:57 ` Maximiliano Sandoval
2024-06-26 14:58 ` [pbs-devel] [PATCH promox v4 4/5] compression: deflate: add test module Maximiliano Sandoval
2024-06-26 14:58 ` [pbs-devel] [PATCH promox v4 5/5] http: teach the Client how to decode deflate content Maximiliano Sandoval
4 siblings, 0 replies; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-06-26 14:57 UTC (permalink / raw)
To: pbs-devel
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
.../src/deflate/decompression.rs | 141 ++++++++++++++++++
proxmox-compression/src/deflate/mod.rs | 2 +
proxmox-compression/src/lib.rs | 2 +-
3 files changed, 144 insertions(+), 1 deletion(-)
create mode 100644 proxmox-compression/src/deflate/decompression.rs
diff --git a/proxmox-compression/src/deflate/decompression.rs b/proxmox-compression/src/deflate/decompression.rs
new file mode 100644
index 00000000..45ed8579
--- /dev/null
+++ b/proxmox-compression/src/deflate/decompression.rs
@@ -0,0 +1,141 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+#[derive(Eq, PartialEq)]
+enum DecoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateDecoder<T> {
+ inner: T,
+ decompressor: Decompress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: DecoderState,
+}
+
+pub struct DeflateDecoderBuilder<T> {
+ inner: T,
+ is_zlib: bool,
+ buffer_size: usize,
+}
+
+impl<T> DeflateDecoderBuilder<T> {
+ pub fn zlib(mut self, is_zlib: bool) -> Self {
+ self.is_zlib = is_zlib;
+ self
+ }
+
+ pub fn buffer_size(mut self, buffer_size: usize) -> Self {
+ self.buffer_size = buffer_size;
+ self
+ }
+
+ pub fn build(self) -> DeflateDecoder<T> {
+ DeflateDecoder {
+ inner: self.inner,
+ decompressor: Decompress::new(self.is_zlib),
+ buffer: ByteBuffer::with_capacity(self.buffer_size),
+ input_buffer: Bytes::new(),
+ state: DecoderState::Reading,
+ }
+ }
+}
+
+impl<T> DeflateDecoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::builder(inner).build()
+ }
+
+ pub fn builder(inner: T) -> DeflateDecoderBuilder<T> {
+ DeflateDecoderBuilder {
+ inner,
+ is_zlib: false,
+ buffer_size: super::BUFFER_SIZE,
+ }
+ }
+
+ fn decode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushDecompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.decompressor.total_in();
+ let old_out = self.decompressor.total_out();
+ let res = self
+ .decompressor
+ .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.decompressor.total_in() - old_in) as usize;
+ let new_out = (self.decompressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl<T, O, E> Stream for DeflateDecoder<T>
+where
+ T: Stream<Item = Result<O, E>> + Unpin,
+ O: Into<Bytes>,
+ E: Into<Error>,
+{
+ type Item = Result<Bytes, anyhow::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ DecoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res.map_err(Into::into)?;
+ this.input_buffer = buf.into();
+ this.state = DecoderState::Writing;
+ } else {
+ this.state = DecoderState::Flushing;
+ }
+ }
+ DecoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(anyhow::format_err!(
+ "empty input during write"
+ ))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = DecoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ DecoderState::Flushing => {
+ let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = DecoderState::Finished;
+ }
+ }
+ DecoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
index 514ccbdc..6867176c 100644
--- a/proxmox-compression/src/deflate/mod.rs
+++ b/proxmox-compression/src/deflate/mod.rs
@@ -1,5 +1,7 @@
mod compression;
+mod decompression;
pub use compression::{DeflateEncoder, Level};
+pub use decompression::DeflateDecoder;
const BUFFER_SIZE: usize = 8192;
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 70d88afe..fb76e0c2 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,4 +1,4 @@
-pub use deflate::{DeflateEncoder, Level};
+pub use deflate::{DeflateDecoder, DeflateEncoder, Level};
mod deflate;
pub mod tar;
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH promox v4 4/5] compression: deflate: add test module
2024-06-26 14:57 [pbs-devel] [PATCH promox v4 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
` (2 preceding siblings ...)
2024-06-26 14:57 ` [pbs-devel] [PATCH promox v4 3/5] compression: deflate: add a decoder Maximiliano Sandoval
@ 2024-06-26 14:58 ` Maximiliano Sandoval
2024-06-26 14:58 ` [pbs-devel] [PATCH promox v4 5/5] http: teach the Client how to decode deflate content Maximiliano Sandoval
4 siblings, 0 replies; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-06-26 14:58 UTC (permalink / raw)
To: pbs-devel
We test the deflate encoder against the deflate decoder using (or not)
zlib and with different small buffer sizes. We also test compression and
decompression against the flate2 crate.
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
proxmox-compression/Cargo.toml | 3 +-
proxmox-compression/src/deflate/mod.rs | 160 +++++++++++++++++++++++++
2 files changed, 161 insertions(+), 2 deletions(-)
diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
index b41c796f..b217e67f 100644
--- a/proxmox-compression/Cargo.toml
+++ b/proxmox-compression/Cargo.toml
@@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
proxmox-lang.workspace = true
[dev-dependencies]
-tokio = { workspace = true, features = [ "macros" ] }
-
+tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
index 6867176c..94faabb3 100644
--- a/proxmox-compression/src/deflate/mod.rs
+++ b/proxmox-compression/src/deflate/mod.rs
@@ -5,3 +5,163 @@ pub use compression::{DeflateEncoder, Level};
pub use decompression::DeflateDecoder;
const BUFFER_SIZE: usize = 8192;
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ use std::io::Write;
+
+ use flate2::Compression;
+ use futures::StreamExt;
+
+ const BUFFER_SIZE: usize = 25;
+ const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+ fn chunker(content: &[u8]) -> Vec<Result<Vec<u8>, std::io::Error>> {
+ vec![
+ Ok(content[..10].to_vec()),
+ Ok(content[10..20].to_vec()),
+ Ok(content[20..30].to_vec()),
+ Ok(content[30..40].to_vec()),
+ Ok(content[40..].to_vec()),
+ ]
+ }
+
+ #[tokio::test]
+ async fn test_encoder_against_decoder() {
+ // We use mixed sizes for the buffers, on the next test we invert the
+ // sizes.
+ let stream = futures::stream::iter(chunker(BODY.as_bytes()));
+ let encoder = DeflateEncoder::builder(stream)
+ .buffer_size(BUFFER_SIZE * 2)
+ .build();
+ let mut decoder = DeflateDecoder::builder(encoder)
+ .buffer_size(BUFFER_SIZE)
+ .build();
+
+ let mut buf = Vec::with_capacity(BODY.len());
+ while let Some(Ok(res)) = decoder.next().await {
+ buf.write_all(&res).unwrap();
+ }
+
+ assert_eq!(buf, BODY.as_bytes());
+ }
+
+ #[tokio::test]
+ async fn test_zlib_encoder_against_decoder() {
+ let stream = futures::stream::iter(chunker(BODY.as_bytes()));
+ let encoder = DeflateEncoder::builder(stream)
+ .zlib(true)
+ .buffer_size(BUFFER_SIZE)
+ .build();
+ let mut decoder = DeflateDecoder::builder(encoder)
+ .zlib(true)
+ .buffer_size(BUFFER_SIZE * 2)
+ .build();
+
+ let mut buf = Vec::with_capacity(BODY.len());
+ while let Some(Ok(res)) = decoder.next().await {
+ buf.write_all(&res).unwrap();
+ }
+
+ assert_eq!(buf, BODY.as_bytes());
+ }
+
+ #[tokio::test]
+ async fn test_deflate_decompression_against_flate2() {
+ let encoded = flate2_encode(BODY.as_bytes(), false).unwrap();
+ let decoded = decode(&encoded, false, 7).await.unwrap();
+
+ assert_eq!(decoded, BODY.as_bytes());
+ }
+
+ #[tokio::test]
+ async fn test_zlib_decompression_against_flate2() {
+ let encoded = flate2_encode(BODY.as_bytes(), true).unwrap();
+ let decoded = decode(&encoded, true, 4).await.unwrap();
+
+ assert_eq!(decoded, BODY.as_bytes());
+ }
+
+ #[tokio::test]
+ async fn test_deflate_compression_against_flate2() {
+ let encoded = encode(BODY.as_bytes(), false, 5).await.unwrap();
+ let decoded = flate2_decode(&encoded, false).unwrap();
+
+ assert_eq!(decoded, BODY.as_bytes());
+ }
+
+ #[tokio::test]
+ async fn test_zlib_compression_against_flate2() {
+ let encoded = encode(BODY.as_bytes(), true, 3).await.unwrap();
+ let decoded = flate2_decode(&encoded, true).unwrap();
+
+ assert_eq!(decoded, BODY.as_bytes());
+ }
+
+ fn flate2_encode(bytes: &[u8], is_zlib: bool) -> Result<Vec<u8>, std::io::Error> {
+ if is_zlib {
+ let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
+ e.write_all(bytes).unwrap();
+ e.finish()
+ } else {
+ let mut e = flate2::write::DeflateEncoder::new(Vec::new(), Compression::default());
+ e.write_all(bytes).unwrap();
+ e.finish()
+ }
+ }
+
+ fn flate2_decode(bytes: &[u8], is_zlib: bool) -> Result<Vec<u8>, std::io::Error> {
+ if is_zlib {
+ let mut e = flate2::write::ZlibDecoder::new(Vec::new());
+ e.write_all(bytes).unwrap();
+ e.finish()
+ } else {
+ let mut e = flate2::write::DeflateDecoder::new(Vec::new());
+ e.write_all(bytes).unwrap();
+ e.finish()
+ }
+ }
+
+ async fn decode(
+ content: &[u8],
+ is_zlib: bool,
+ buffer_size: usize,
+ ) -> Result<Vec<u8>, std::io::Error> {
+ let stream = futures::stream::iter(chunker(content));
+ let mut decoder = DeflateDecoder::builder(stream)
+ .zlib(is_zlib)
+ .buffer_size(buffer_size)
+ .build();
+ let mut buf = Vec::new();
+
+ while let Some(Ok(res)) = decoder.next().await {
+ buf.write_all(&res)?;
+ }
+
+ Ok(buf)
+ }
+
+ async fn encode(
+ content: &[u8],
+ is_zlib: bool,
+ buffer_size: usize,
+ ) -> Result<Vec<u8>, std::io::Error> {
+ let stream = futures::stream::iter(chunker(content));
+ let mut encoder = DeflateEncoder::builder(stream)
+ .zlib(is_zlib)
+ .buffer_size(buffer_size)
+ .build();
+ let mut buf = Vec::with_capacity(BODY.len());
+
+ while let Some(Ok(res)) = encoder.next().await {
+ buf.write_all(&res)?;
+ }
+
+ Ok(buf)
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread
* [pbs-devel] [PATCH promox v4 5/5] http: teach the Client how to decode deflate content
2024-06-26 14:57 [pbs-devel] [PATCH promox v4 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
` (3 preceding siblings ...)
2024-06-26 14:58 ` [pbs-devel] [PATCH promox v4 4/5] compression: deflate: add test module Maximiliano Sandoval
@ 2024-06-26 14:58 ` Maximiliano Sandoval
4 siblings, 0 replies; 8+ messages in thread
From: Maximiliano Sandoval @ 2024-06-26 14:58 UTC (permalink / raw)
To: pbs-devel; +Cc: Lukas Wagner
The Backup Server can compress the content using deflate so we teach the
client how to decode it.
If a request is sent with the `Accept-Encoding` [2] header set to
`deflate`, and the response's `Content-Encoding` [1] header is equal to
`deflate` we wrap the Body stream with a stream that can decode `zlib`
on the run.
Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
actually `zlib`.
This can be also tested against
http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
following test:
```rust
#[tokio::test]
async fn test_client() {
let client = Client::new();
let headers = HashMap::from([(
hyper::header::ACCEPT_ENCODING.to_string(),
"deflate".to_string(),
)]);
let response = client
.get_string("https://eu.httpbin.org/deflate", Some(&headers))
.await;
assert!(response.is_ok());
}
```
at `proxmox-http/src/client/simple.rs` and running
```
cargo test --features=client,client-trait
```
[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
[2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding
Suggested-by: Lukas Wagner <l.wagner@proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
proxmox-http/Cargo.toml | 7 ++++
proxmox-http/src/client/simple.rs | 65 ++++++++++++++++++++++++++++++-
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9ece24eb..4455ba85 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
proxmox-sys = { workspace = true, optional = true }
proxmox-io = { workspace = true, optional = true }
proxmox-lang = { workspace = true, optional = true }
+proxmox-compression = { workspace = true, optional = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = [ "macros" ] }
+flate2 = { workspace = true }
[features]
default = []
@@ -42,12 +47,14 @@ client = [
"dep:futures",
"dep:hyper",
"dep:openssl",
+ "dep:proxmox-compression",
"dep:tokio",
"dep:tokio-openssl",
"http-helpers",
"hyper?/client",
"hyper?/http1",
"hyper?/http2",
+ "hyper?/stream",
"hyper?/tcp",
"rate-limited-stream",
"tokio?/io-util",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index e9910802..062889ac 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -78,7 +78,8 @@ impl Client {
self.add_proxy_headers(&mut request)?;
- self.client.request(request).map_err(Error::from).await
+ let encoded_response = self.client.request(request).map_err(Error::from).await?;
+ decode_response(encoded_response).await
}
pub async fn post(
@@ -245,3 +246,65 @@ impl crate::HttpClient<String, String> for Client {
})
}
}
+
+/// Wraps the `Body` stream in a DeflateDecoder stream if the `Content-Encoding`
+/// header of the response is `deflate`, otherwise returns the original
+/// response.
+async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
+ let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
+ return Ok(res);
+ };
+
+ let encodings = content_encoding.to_str()?;
+ if encodings == "deflate" {
+ let (parts, body) = res.into_parts();
+ let decoder = proxmox_compression::DeflateDecoder::builder(body)
+ .zlib(true)
+ .build();
+ let decoded_body = Body::wrap_stream(decoder);
+ Ok(Response::from_parts(parts, decoded_body))
+ } else {
+ bail!("Unknown encoding format: {encodings}");
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ use std::io::Write;
+
+ const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+ #[tokio::test]
+ async fn test_parse_response_deflate() {
+ let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+ let encoded_body = Body::from(encoded);
+ let encoded_response = Response::builder()
+ .header(hyper::header::CONTENT_ENCODING, "deflate")
+ .body(encoded_body)
+ .unwrap();
+
+ let decoded_response = decode_response(encoded_response).await.unwrap();
+
+ assert_eq!(
+ Client::response_body_string(decoded_response)
+ .await
+ .unwrap(),
+ BODY
+ );
+ }
+
+ fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+ use flate2::write::ZlibEncoder;
+ use flate2::Compression;
+
+ let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
+ e.write_all(bytes).unwrap();
+
+ e.finish()
+ }
+}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 8+ messages in thread