all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Maximiliano Sandoval <m.sandoval@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content
Date: Thu, 28 Mar 2024 14:40:31 +0100	[thread overview]
Message-ID: <20240328134031.153541-1-m.sandoval@proxmox.com> (raw)

The Backup Server can compress the content using deflate so we teach the
client how to decode it.

If a request is sent with the `Accept-Encoding` [2] header set to
`deflate`, and the response's `Content-Encoding` [1] header is equal to
`deflate` we wrap the Body stream with a stream that can decode `zlib`
on the run.

Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
actually `zlib`.

The new `ZlibDecoder` Stream is basically the same as the
`DeflateEncoder` struct in the proxmox-compress crate.

This can be also tested against
http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
following test:

```rust
    #[tokio::test]
    async fn test_client() {
        let client = Client::new();
        let headers = HashMap::from([(
            hyper::header::ACCEPT_ENCODING.to_string(),
            "deflate".to_string(),
        )]);
        let response = client
            .get_string("https://eu.httpbin.org/deflate", Some(&headers))
            .await;
        assert!(response.is_ok());
    }
```

at `proxmox-http/src/client/simple.rs` and running

```
cargo test --features=client,client-trait
```

[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
[2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding

Suggested-by: Lukas Wagner <l.wagner@proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
Differences from v1:
 - Only implement deflate for the moment
 - Turn decoder into a stream
 - Do not set Accept-Encoding

The API for setting the Accept-Encoding needs to be figured out in a follow-up.

 proxmox-compression/Cargo.toml          |   3 +-
 proxmox-compression/src/lib.rs          |   3 +
 proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++
 proxmox-http/Cargo.toml                 |   7 ++
 proxmox-http/src/client/simple.rs       |  63 +++++++++-
 5 files changed, 234 insertions(+), 3 deletions(-)
 create mode 100644 proxmox-compression/src/zlib_decoder.rs

diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
index 49735cbe..3879ed16 100644
--- a/proxmox-compression/Cargo.toml
+++ b/proxmox-compression/Cargo.toml
@@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
 proxmox-lang.workspace = true
 
 [dev-dependencies]
-tokio = { workspace = true, features = [ "macros" ] }
-
+tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 1fcfb977..f5d6e269 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,6 +1,9 @@
 mod compression;
 pub use compression::*;
 
+mod zlib_decoder;
+pub use zlib_decoder::*;
+
 pub mod tar;
 pub mod zip;
 pub mod zstd;
diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs
new file mode 100644
index 00000000..fdf9682b
--- /dev/null
+++ b/proxmox-compression/src/zlib_decoder.rs
@@ -0,0 +1,161 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+    Reading,
+    Writing,
+    Flushing,
+    Finished,
+}
+
+pub struct ZlibDecoder<T> {
+    inner: T,
+    decompressor: Decompress,
+    buffer: ByteBuffer,
+    input_buffer: Bytes,
+    state: EncoderState,
+}
+
+impl<T> ZlibDecoder<T> {
+    pub fn new(inner: T) -> Self {
+        Self::with_buffer_size(inner, BUFFER_SIZE)
+    }
+
+    fn with_buffer_size(inner: T, buffer_size: usize) -> Self {
+        Self {
+            inner,
+            decompressor: Decompress::new(true),
+            buffer: ByteBuffer::with_capacity(buffer_size),
+            input_buffer: Bytes::new(),
+            state: EncoderState::Reading,
+        }
+    }
+
+    fn decode(
+        &mut self,
+        inbuf: &[u8],
+        flush: FlushDecompress,
+    ) -> Result<(usize, flate2::Status), io::Error> {
+        let old_in = self.decompressor.total_in();
+        let old_out = self.decompressor.total_out();
+        let res = self
+            .decompressor
+            .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+        let new_in = (self.decompressor.total_in() - old_in) as usize;
+        let new_out = (self.decompressor.total_out() - old_out) as usize;
+        self.buffer.add_size(new_out);
+
+        Ok((new_in, res))
+    }
+}
+
+impl<T, O, E> Stream for ZlibDecoder<T>
+where
+    T: Stream<Item = Result<O, E>> + Unpin,
+    O: Into<Bytes>,
+    E: Into<Error>,
+{
+    type Item = Result<Bytes, anyhow::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.state {
+                EncoderState::Reading => {
+                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+                        let buf = res.map_err(Into::into)?;
+                        this.input_buffer = buf.into();
+                        this.state = EncoderState::Writing;
+                    } else {
+                        this.state = EncoderState::Flushing;
+                    }
+                }
+                EncoderState::Writing => {
+                    if this.input_buffer.is_empty() {
+                        return Poll::Ready(Some(Err(anyhow::format_err!(
+                            "empty input during write"
+                        ))));
+                    }
+                    let mut buf = this.input_buffer.split_off(0);
+                    let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+                    this.input_buffer = buf.split_off(read);
+                    if this.input_buffer.is_empty() {
+                        this.state = EncoderState::Reading;
+                    }
+                    if this.buffer.is_full() || res == flate2::Status::BufError {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                }
+                EncoderState::Flushing => {
+                    let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+                    if !this.buffer.is_empty() {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                    if res == flate2::Status::StreamEnd {
+                        this.state = EncoderState::Finished;
+                    }
+                }
+                EncoderState::Finished => return Poll::Ready(None),
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use futures::StreamExt;
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::Compression;
+        use std::io::Write;
+
+        let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+
+    #[tokio::test]
+    async fn test_decompression() {
+        const BUFFER_SIZE: usize = 5;
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let chunks: Vec<Result<_, std::io::Error>> = vec![
+            Ok(encoded[..10].to_vec()),
+            Ok(encoded[10..20].to_vec()),
+            Ok(encoded[20..30].to_vec()),
+            Ok(encoded[30..40].to_vec()),
+            Ok(encoded[40..].to_vec()),
+        ];
+        let stream = futures::stream::iter(chunks);
+        let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE);
+        let mut buf = Vec::with_capacity(BODY.len());
+
+        while let Some(Ok(res)) = decoder.next().await {
+            buf.write_all(&res).unwrap();
+        }
+        assert_eq!(buf, BODY.as_bytes());
+    }
+}
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9ece24eb..4455ba85 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
 proxmox-sys = { workspace = true, optional = true }
 proxmox-io = { workspace = true, optional = true }
 proxmox-lang = { workspace = true, optional = true }
+proxmox-compression = { workspace = true, optional = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = [ "macros" ] }
+flate2 = { workspace = true }
 
 [features]
 default = []
@@ -42,12 +47,14 @@ client = [
     "dep:futures",
     "dep:hyper",
     "dep:openssl",
+    "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
     "http-helpers",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
+    "hyper?/stream",
     "hyper?/tcp",
     "rate-limited-stream",
     "tokio?/io-util",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index e9910802..b9e240d2 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -78,7 +78,8 @@ impl Client {
 
         self.add_proxy_headers(&mut request)?;
 
-        self.client.request(request).map_err(Error::from).await
+        let encoded_response = self.client.request(request).map_err(Error::from).await?;
+        decode_response(encoded_response).await
     }
 
     pub async fn post(
@@ -245,3 +246,63 @@ impl crate::HttpClient<String, String> for Client {
         })
     }
 }
+
+/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding`
+/// header of the response is `deflate`, otherwise returns the original
+/// response.
+async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
+    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
+        return Ok(res);
+    };
+
+    let encodings = content_encoding.to_str()?;
+    if encodings == "deflate" {
+        let (parts, body) = res.into_parts();
+        let decoder = proxmox_compression::ZlibDecoder::new(body);
+        let decoded_body = Body::wrap_stream(decoder);
+        Ok(Response::from_parts(parts, decoded_body))
+    } else {
+        bail!("Unknown encoding format: {encodings}");
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    #[tokio::test]
+    async fn test_parse_response_deflate() {
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let encoded_body = Body::from(encoded);
+        let encoded_response = Response::builder()
+            .header(hyper::header::CONTENT_ENCODING, "deflate")
+            .body(encoded_body)
+            .unwrap();
+
+        let decoded_response = decode_response(encoded_response).await.unwrap();
+
+        assert_eq!(
+            Client::response_body_string(decoded_response)
+                .await
+                .unwrap(),
+            BODY
+        );
+    }
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::write::ZlibEncoder;
+        use flate2::Compression;
+
+        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+}
-- 
2.39.2





             reply	other threads:[~2024-03-28 13:41 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-03-28 13:40 Maximiliano Sandoval [this message]
2024-04-03 16:44 ` Max Carrara

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240328134031.153541-1-m.sandoval@proxmox.com \
    --to=m.sandoval@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal