From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8A14EBC46C for ; Thu, 28 Mar 2024 14:41:05 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 69F53C24A for ; Thu, 28 Mar 2024 14:40:35 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 28 Mar 2024 14:40:33 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id A41BD428DB for ; Thu, 28 Mar 2024 14:40:33 +0100 (CET) From: Maximiliano Sandoval To: pbs-devel@lists.proxmox.com Date: Thu, 28 Mar 2024 14:40:31 +0100 Message-Id: <20240328134031.153541-1-m.sandoval@proxmox.com> X-Mailer: git-send-email 2.39.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.013 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [mozilla.org, httpbin.org, simple.rs, lib.rs] Subject: [pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 28 Mar 2024 13:41:05 -0000 The Backup Server can compress the content using deflate so we teach the client how to decode it. If a request is sent with the `Accept-Encoding` [2] header set to `deflate`, and the response's `Content-Encoding` [1] header is equal to `deflate` we wrap the Body stream with a stream that can decode `zlib` on the run. Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is actually `zlib`. The new `ZlibDecoder` Stream is basically the same as the `DeflateEncoder` struct in the proxmox-compress crate. This can be also tested against http://eu.httpbin.org/#/Response_formats/get_deflate by adding the following test: ```rust #[tokio::test] async fn test_client() { let client = Client::new(); let headers = HashMap::from([( hyper::header::ACCEPT_ENCODING.to_string(), "deflate".to_string(), )]); let response = client .get_string("https://eu.httpbin.org/deflate", Some(&headers)) .await; assert!(response.is_ok()); } ``` at `proxmox-http/src/client/simple.rs` and running ``` cargo test --features=client,client-trait ``` [1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding [2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Encoding Suggested-by: Lukas Wagner Signed-off-by: Maximiliano Sandoval --- Differences from v1: - Only implement deflate for the moment - Turn decoder into a stream - Do not set Accept-Encoding The API for setting the Accept-Encoding needs to be figured out in a follow-up. proxmox-compression/Cargo.toml | 3 +- proxmox-compression/src/lib.rs | 3 + proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++ proxmox-http/Cargo.toml | 7 ++ proxmox-http/src/client/simple.rs | 63 +++++++++- 5 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 proxmox-compression/src/zlib_decoder.rs diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.toml index 49735cbe..3879ed16 100644 --- a/proxmox-compression/Cargo.toml +++ b/proxmox-compression/Cargo.toml @@ -27,5 +27,4 @@ proxmox-io = { workspace = true, features = [ "tokio" ] } proxmox-lang.workspace = true [dev-dependencies] -tokio = { workspace = true, features = [ "macros" ] } - +tokio = { workspace = true, features = [ "macros", "rt-multi-thread" ] } diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs index 1fcfb977..f5d6e269 100644 --- a/proxmox-compression/src/lib.rs +++ b/proxmox-compression/src/lib.rs @@ -1,6 +1,9 @@ mod compression; pub use compression::*; +mod zlib_decoder; +pub use zlib_decoder::*; + pub mod tar; pub mod zip; pub mod zstd; diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compression/src/zlib_decoder.rs new file mode 100644 index 00000000..fdf9682b --- /dev/null +++ b/proxmox-compression/src/zlib_decoder.rs @@ -0,0 +1,161 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Error; +use bytes::Bytes; +use flate2::{Decompress, FlushDecompress}; +use futures::ready; +use futures::stream::Stream; + +use proxmox_io::ByteBuffer; + +const BUFFER_SIZE: usize = 8192; + +#[derive(Eq, PartialEq)] +enum EncoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct ZlibDecoder { + inner: T, + decompressor: Decompress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: EncoderState, +} + +impl ZlibDecoder { + pub fn new(inner: T) -> Self { + Self::with_buffer_size(inner, BUFFER_SIZE) + } + + fn with_buffer_size(inner: T, buffer_size: usize) -> Self { + Self { + inner, + decompressor: Decompress::new(true), + buffer: ByteBuffer::with_capacity(buffer_size), + input_buffer: Bytes::new(), + state: EncoderState::Reading, + } + } + + fn decode( + &mut self, + inbuf: &[u8], + flush: FlushDecompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.decompressor.total_in(); + let old_out = self.decompressor.total_out(); + let res = self + .decompressor + .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.decompressor.total_in() - old_in) as usize; + let new_out = (self.decompressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl Stream for ZlibDecoder +where + T: Stream> + Unpin, + O: Into, + E: Into, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + EncoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res.map_err(Into::into)?; + this.input_buffer = buf.into(); + this.state = EncoderState::Writing; + } else { + this.state = EncoderState::Flushing; + } + } + EncoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(anyhow::format_err!( + "empty input during write" + )))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.decode(&buf[..], FlushDecompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = EncoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + EncoderState::Flushing => { + let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = EncoderState::Finished; + } + } + EncoderState::Finished => return Poll::Ready(None), + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use futures::StreamExt; + use std::io::Write; + + const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest, +si aliquod aeternum et infinitum impendere."#; + + fn encode_deflate(bytes: &[u8]) -> Result, std::io::Error> { + use flate2::Compression; + use std::io::Write; + + let mut e = flate2::write::ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(bytes).unwrap(); + + e.finish() + } + + #[tokio::test] + async fn test_decompression() { + const BUFFER_SIZE: usize = 5; + let encoded = encode_deflate(BODY.as_bytes()).unwrap(); + let chunks: Vec> = vec![ + Ok(encoded[..10].to_vec()), + Ok(encoded[10..20].to_vec()), + Ok(encoded[20..30].to_vec()), + Ok(encoded[30..40].to_vec()), + Ok(encoded[40..].to_vec()), + ]; + let stream = futures::stream::iter(chunks); + let mut decoder = ZlibDecoder::with_buffer_size(stream, BUFFER_SIZE); + let mut buf = Vec::with_capacity(BODY.len()); + + while let Some(Ok(res)) = decoder.next().await { + buf.write_all(&res).unwrap(); + } + assert_eq!(buf, BODY.as_bytes()); + } +} diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml index 9ece24eb..4455ba85 100644 --- a/proxmox-http/Cargo.toml +++ b/proxmox-http/Cargo.toml @@ -26,6 +26,11 @@ proxmox-async = { workspace = true, optional = true } proxmox-sys = { workspace = true, optional = true } proxmox-io = { workspace = true, optional = true } proxmox-lang = { workspace = true, optional = true } +proxmox-compression = { workspace = true, optional = true } + +[dev-dependencies] +tokio = { workspace = true, features = [ "macros" ] } +flate2 = { workspace = true } [features] default = [] @@ -42,12 +47,14 @@ client = [ "dep:futures", "dep:hyper", "dep:openssl", + "dep:proxmox-compression", "dep:tokio", "dep:tokio-openssl", "http-helpers", "hyper?/client", "hyper?/http1", "hyper?/http2", + "hyper?/stream", "hyper?/tcp", "rate-limited-stream", "tokio?/io-util", diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/simple.rs index e9910802..b9e240d2 100644 --- a/proxmox-http/src/client/simple.rs +++ b/proxmox-http/src/client/simple.rs @@ -78,7 +78,8 @@ impl Client { self.add_proxy_headers(&mut request)?; - self.client.request(request).map_err(Error::from).await + let encoded_response = self.client.request(request).map_err(Error::from).await?; + decode_response(encoded_response).await } pub async fn post( @@ -245,3 +246,63 @@ impl crate::HttpClient for Client { }) } } + +/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Encoding` +/// header of the response is `deflate`, otherwise returns the original +/// response. +async fn decode_response(mut res: Response) -> Result, Error> { + let Some(content_encoding) = res.headers_mut().remove(&hyper::header::CONTENT_ENCODING) else { + return Ok(res); + }; + + let encodings = content_encoding.to_str()?; + if encodings == "deflate" { + let (parts, body) = res.into_parts(); + let decoder = proxmox_compression::ZlibDecoder::new(body); + let decoded_body = Body::wrap_stream(decoder); + Ok(Response::from_parts(parts, decoded_body)) + } else { + bail!("Unknown encoding format: {encodings}"); + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::io::Write; + + const BODY: &str = r#"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna accessio potest, +si aliquod aeternum et infinitum impendere."#; + + #[tokio::test] + async fn test_parse_response_deflate() { + let encoded = encode_deflate(BODY.as_bytes()).unwrap(); + let encoded_body = Body::from(encoded); + let encoded_response = Response::builder() + .header(hyper::header::CONTENT_ENCODING, "deflate") + .body(encoded_body) + .unwrap(); + + let decoded_response = decode_response(encoded_response).await.unwrap(); + + assert_eq!( + Client::response_body_string(decoded_response) + .await + .unwrap(), + BODY + ); + } + + fn encode_deflate(bytes: &[u8]) -> Result, std::io::Error> { + use flate2::write::ZlibEncoder; + use flate2::Compression; + + let mut e = ZlibEncoder::new(Vec::new(), Compression::default()); + e.write_all(bytes).unwrap(); + + e.finish() + } +} -- 2.39.2