all lists on lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content
@ 2024-03-28 13:40 Maximiliano Sandoval
  2024-04-03 16:44 ` Max Carrara
  0 siblings, 1 reply; 2+ messages in thread
From: Maximiliano Sandoval @ 2024-03-28 13:40 UTC (permalink / raw)
  To: pbs-devel

The Backup Server can compress the content using deflate so we teach the
client how to decode it.

If a request is sent with the `Accept-Encoding` [2] header set to
`deflate`, and the response's `Content-Encoding` [1] header is equal to
`deflate` we wrap the Body stream with a stream that can decode `zlib`
on the run.

Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is
actually `zlib`.

The new `ZlibDecoder` Stream is basically the same as the
`DeflateEncoder` struct in the proxmox-compress crate.

This can be also tested against
http://eu.httpbin.org/#/Response_formats/get_deflate by adding the
following test:

```rust
    #[tokio::test]
    async fn test_client() {
        let client = Client::new();
        let headers = HashMap::from([(
            hyper::header::ACCEPT_ENCODING.to_string(),
            "deflate".to_string(),
        )]);
        let response = client
            .get_string("https://eu.httpbin.org/deflate", Some(&headers))
            .await;
        assert!(response.is_ok());
    }
```

at `proxmox-http/src/client/simple.rs` and running

```
cargo test --features=client,client-trait
```

[1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
[2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding

Suggested-by: Lukas Wagner <l.wagner@proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
Differences from v1:
 - Only implement deflate for the moment
 - Turn decoder into a stream
 - Do not set Accept-Encoding

The API for setting the Accept-Encoding needs to be figured out in a follow-up.

 proxmox-compression/Cargo.toml          |   3 +-
 proxmox-compression/src/lib.rs          |   3 +
 proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++
 proxmox-http/Cargo.toml                 |   7 ++
 proxmox-http/src/client/simple.rs       |  63 +++++++++-
 5 files changed, 234 insertions(+), 3 deletions(-)
 create mode 100644 proxmox-compression/src/zlib_decoder.rs

diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml
index 49735cbe..3879ed16 100644
--- a/proxmox-compression/Cargo.toml
+++ b/proxmox-compression/Cargo.toml
@@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] }
 proxmox-lang.workspace = true
 
 [dev-dependencies]
-tokio = { workspace = true, features = [ "macros" ] }
-
+tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] }
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 1fcfb977..f5d6e269 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,6 +1,9 @@
 mod compression;
 pub use compression::*;
 
+mod zlib_decoder;
+pub use zlib_decoder::*;
+
 pub mod tar;
 pub mod zip;
 pub mod zstd;
diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs
new file mode 100644
index 00000000..fdf9682b
--- /dev/null
+++ b/proxmox-compression/src/zlib_decoder.rs
@@ -0,0 +1,161 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+const BUFFER_SIZE: usize = 8192;
+
+#[derive(Eq, PartialEq)]
+enum EncoderState {
+    Reading,
+    Writing,
+    Flushing,
+    Finished,
+}
+
+pub struct ZlibDecoder<T> {
+    inner: T,
+    decompressor: Decompress,
+    buffer: ByteBuffer,
+    input_buffer: Bytes,
+    state: EncoderState,
+}
+
+impl<T> ZlibDecoder<T> {
+    pub fn new(inner: T) -> Self {
+        Self::with_buffer_size(inner, BUFFER_SIZE)
+    }
+
+    fn with_buffer_size(inner: T, buffer_size: usize) -> Self {
+        Self {
+            inner,
+            decompressor: Decompress::new(true),
+            buffer: ByteBuffer::with_capacity(buffer_size),
+            input_buffer: Bytes::new(),
+            state: EncoderState::Reading,
+        }
+    }
+
+    fn decode(
+        &mut self,
+        inbuf: &[u8],
+        flush: FlushDecompress,
+    ) -> Result<(usize, flate2::Status), io::Error> {
+        let old_in = self.decompressor.total_in();
+        let old_out = self.decompressor.total_out();
+        let res = self
+            .decompressor
+            .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+        let new_in = (self.decompressor.total_in() - old_in) as usize;
+        let new_out = (self.decompressor.total_out() - old_out) as usize;
+        self.buffer.add_size(new_out);
+
+        Ok((new_in, res))
+    }
+}
+
+impl<T, O, E> Stream for ZlibDecoder<T>
+where
+    T: Stream<Item = Result<O, E>> + Unpin,
+    O: Into<Bytes>,
+    E: Into<Error>,
+{
+    type Item = Result<Bytes, anyhow::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.state {
+                EncoderState::Reading => {
+                    if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+                        let buf = res.map_err(Into::into)?;
+                        this.input_buffer = buf.into();
+                        this.state = EncoderState::Writing;
+                    } else {
+                        this.state = EncoderState::Flushing;
+                    }
+                }
+                EncoderState::Writing => {
+                    if this.input_buffer.is_empty() {
+                        return Poll::Ready(Some(Err(anyhow::format_err!(
+                            "empty input during write"
+                        ))));
+                    }
+                    let mut buf = this.input_buffer.split_off(0);
+                    let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+                    this.input_buffer = buf.split_off(read);
+                    if this.input_buffer.is_empty() {
+                        this.state = EncoderState::Reading;
+                    }
+                    if this.buffer.is_full() || res == flate2::Status::BufError {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                }
+                EncoderState::Flushing => {
+                    let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+                    if !this.buffer.is_empty() {
+                        let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+                        return Poll::Ready(Some(Ok(bytes.into())));
+                    }
+                    if res == flate2::Status::StreamEnd {
+                        this.state = EncoderState::Finished;
+                    }
+                }
+                EncoderState::Finished => return Poll::Ready(None),
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use futures::StreamExt;
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::Compression;
+        use std::io::Write;
+
+        let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+
+    #[tokio::test]
+    async fn test_decompression() {
+        const BUFFER_SIZE: usize = 5;
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let chunks: Vec<Result<_, std::io::Error>> = vec![
+            Ok(encoded[..10].to_vec()),
+            Ok(encoded[10..20].to_vec()),
+            Ok(encoded[20..30].to_vec()),
+            Ok(encoded[30..40].to_vec()),
+            Ok(encoded[40..].to_vec()),
+        ];
+        let stream = futures::stream::iter(chunks);
+        let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE);
+        let mut buf = Vec::with_capacity(BODY.len());
+
+        while let Some(Ok(res)) = decoder.next().await {
+            buf.write_all(&res).unwrap();
+        }
+        assert_eq!(buf, BODY.as_bytes());
+    }
+}
diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml
index 9ece24eb..4455ba85 100644
--- a/proxmox-http/Cargo.toml
+++ b/proxmox-http/Cargo.toml
@@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true }
 proxmox-sys = { workspace = true, optional = true }
 proxmox-io = { workspace = true, optional = true }
 proxmox-lang = { workspace = true, optional = true }
+proxmox-compression = { workspace = true, optional = true }
+
+[dev-dependencies]
+tokio = { workspace = true, features = [ "macros" ] }
+flate2 = { workspace = true }
 
 [features]
 default = []
@@ -42,12 +47,14 @@ client = [
     "dep:futures",
     "dep:hyper",
     "dep:openssl",
+    "dep:proxmox-compression",
     "dep:tokio",
     "dep:tokio-openssl",
     "http-helpers",
     "hyper?/client",
     "hyper?/http1",
     "hyper?/http2",
+    "hyper?/stream",
     "hyper?/tcp",
     "rate-limited-stream",
     "tokio?/io-util",
diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs
index e9910802..b9e240d2 100644
--- a/proxmox-http/src/client/simple.rs
+++ b/proxmox-http/src/client/simple.rs
@@ -78,7 +78,8 @@ impl Client {
 
         self.add_proxy_headers(&mut request)?;
 
-        self.client.request(request).map_err(Error::from).await
+        let encoded_response = self.client.request(request).map_err(Error::from).await?;
+        decode_response(encoded_response).await
     }
 
     pub async fn post(
@@ -245,3 +246,63 @@ impl crate::HttpClient<String, String> for Client {
         })
     }
 }
+
+/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding`
+/// header of the response is `deflate`, otherwise returns the original
+/// response.
+async fn decode_response(mut res: Response<Body>) -> Result<Response<Body>, Error> {
+    let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else {
+        return Ok(res);
+    };
+
+    let encodings = content_encoding.to_str()?;
+    if encodings == "deflate" {
+        let (parts, body) = res.into_parts();
+        let decoder = proxmox_compression::ZlibDecoder::new(body);
+        let decoded_body = Body::wrap_stream(decoder);
+        Ok(Response::from_parts(parts, decoded_body))
+    } else {
+        bail!("Unknown encoding format: {encodings}");
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use std::io::Write;
+
+    const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
+eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut
+enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest,
+si aliquod aeternum et infinitum impendere."#;
+
+    #[tokio::test]
+    async fn test_parse_response_deflate() {
+        let encoded = encode_deflate(BODY.as_bytes()).unwrap();
+        let encoded_body = Body::from(encoded);
+        let encoded_response = Response::builder()
+            .header(hyper::header::CONTENT_ENCODING, "deflate")
+            .body(encoded_body)
+            .unwrap();
+
+        let decoded_response = decode_response(encoded_response).await.unwrap();
+
+        assert_eq!(
+            Client::response_body_string(decoded_response)
+                .await
+                .unwrap(),
+            BODY
+        );
+    }
+
+    fn encode_deflate(bytes: &[u8]) -> Result<Vec<u8>, std::io::Error> {
+        use flate2::write::ZlibEncoder;
+        use flate2::Compression;
+
+        let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
+        e.write_all(bytes).unwrap();
+
+        e.finish()
+    }
+}
-- 
2.39.2





^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2024-04-03 16:44 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-03-28 13:40 [pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content Maximiliano Sandoval
2024-04-03 16:44 ` Max Carrara

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal