From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 09D9991467 for ; Wed, 3 Apr 2024 18:44:44 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E20EE1CA44 for ; Wed, 3 Apr 2024 18:44:43 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Wed, 3 Apr 2024 18:44:42 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 255D744DD7 for ; Wed, 3 Apr 2024 18:44:42 +0200 (CEST) Mime-Version: 1.0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=UTF-8 Date: Wed, 03 Apr 2024 18:44:39 +0200 Message-Id: To: "Proxmox Backup Server development discussion" From: "Max Carrara" X-Mailer: aerc 0.17.0-72-g6a84f1331f1c References: <20240328134031.153541-1-m.sandoval@proxmox.com> In-Reply-To: <20240328134031.153541-1-m.sandoval@proxmox.com> X-SPAM-LEVEL: Spam detection results: 0 AWL 0.027 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [httpbin.org, mozilla.org, lib.rs, flate.rs, compression.rs, simple.rs] Subject: Re: [pbs-devel] [PATCH proxmox v2] http: teach the Client how to decode deflate content X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 03 Apr 2024 16:44:44 -0000 On Thu Mar 28, 2024 at 2:40 PM CET, Maximiliano Sandoval wrote: > The Backup Server can compress the content using deflate so we teach the > client how to decode it. > > If a request is sent with the `Accept-Encoding` [2] header set to > `deflate`, and the response's `Content-Encoding` [1] header is equal to > `deflate` we wrap the Body stream with a stream that can decode `zlib` > on the run. > > Note that from the `Accept-Encoding` docs [2], the `deflate` encoding is > actually `zlib`. > > The new `ZlibDecoder` Stream is basically the same as the > `DeflateEncoder` struct in the proxmox-compress crate. > > This can be also tested against > http://eu.httpbin.org/#/Response_formats/get_deflate by adding the > following test: > > ```rust > #[tokio::test] > async fn test_client() { > let client =3D Client::new(); > let headers =3D HashMap::from([( > hyper::header::ACCEPT_ENCODING.to_string(), > "deflate".to_string(), > )]); > let response =3D client > .get_string("https://eu.httpbin.org/deflate", Some(&headers)) > .await; > assert!(response.is_ok()); > } > ``` > > at `proxmox-http/src/client/simple.rs` and running > > ``` > cargo test --features=3Dclient,client-trait > ``` Note that the tests you added (in both crates) also run if you invoke `cargo test --all-features` in the workspace root. Overall I'd prefer if this patch was split up, because you're touching two crates at once - one adds a new public API, the other extends the capability of an existing one. This would just make it easier to track what parts are actually affected, as the changes to `proxmox-compression` could be applied separately first (and those to `proxmox-http` at a later point if necessary). Apart from that, it seems you haven't checked whether we always compress our responses server-side (if the client allows it) as I had mentioned in my response to v1 (or at least you didn't mention here if we do). Since you've now omitted sending `Accept-Encoding` on requests, it won't make a difference - but perhaps this is something we should check out in a later patch (series) after this one's done? IMO we shouldn't just generically compress data everywhere even if the client accepts it (for reasons see my response to v1). Otherwise, looks pretty good to me! See some more comments inline. > > [1] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Enc= oding > [2] https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Enco= ding > > Suggested-by: Lukas Wagner > Signed-off-by: Maximiliano Sandoval > --- > Differences from v1: > - Only implement deflate for the moment > - Turn decoder into a stream > - Do not set Accept-Encoding > > The API for setting the Accept-Encoding needs to be figured out in a foll= ow-up. > > proxmox-compression/Cargo.toml | 3 +- > proxmox-compression/src/lib.rs | 3 + > proxmox-compression/src/zlib_decoder.rs | 161 ++++++++++++++++++++++++ > proxmox-http/Cargo.toml | 7 ++ > proxmox-http/src/client/simple.rs | 63 +++++++++- > 5 files changed, 234 insertions(+), 3 deletions(-) > create mode 100644 proxmox-compression/src/zlib_decoder.rs > > diff --git a/proxmox-compression/Cargo.toml b/proxmox-compression/Cargo.t= oml > index 49735cbe..3879ed16 100644 > --- a/proxmox-compression/Cargo.toml > +++ b/proxmox-compression/Cargo.toml > @@ -27,5 +27,4 @@ proxmox-io =3D { workspace =3D true, features =3D [ "to= kio" ] } > proxmox-lang.workspace =3D true > =20 > [dev-dependencies] > -tokio =3D { workspace =3D true, features =3D [ "macros" ] } > - > +tokio =3D { workspace =3D true, features =3D [ "macros", "rt-multi-threa= d" ] } > diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib= .rs > index 1fcfb977..f5d6e269 100644 > --- a/proxmox-compression/src/lib.rs > +++ b/proxmox-compression/src/lib.rs > @@ -1,6 +1,9 @@ > mod compression; > pub use compression::*; > =20 > +mod zlib_decoder; > +pub use zlib_decoder::*; > + > pub mod tar; > pub mod zip; > pub mod zstd; > diff --git a/proxmox-compression/src/zlib_decoder.rs b/proxmox-compressio= n/src/zlib_decoder.rs > new file mode 100644 > index 00000000..fdf9682b > --- /dev/null > +++ b/proxmox-compression/src/zlib_decoder.rs > @@ -0,0 +1,161 @@ > +use std::io; > +use std::pin::Pin; > +use std::task::{Context, Poll}; > + > +use anyhow::Error; > +use bytes::Bytes; > +use flate2::{Decompress, FlushDecompress}; > +use futures::ready; > +use futures::stream::Stream; > + > +use proxmox_io::ByteBuffer; > + > +const BUFFER_SIZE: usize =3D 8192; > + > +#[derive(Eq, PartialEq)] > +enum EncoderState { > + Reading, > + Writing, > + Flushing, > + Finished, > +} > + > +pub struct ZlibDecoder { > + inner: T, > + decompressor: Decompress, > + buffer: ByteBuffer, > + input_buffer: Bytes, > + state: EncoderState, > +} The decoder is pretty fine as it is - though as we had discussed off list a while ago, I wonder if there's a way to make this (and the encoder) a little more generic. > + > +impl ZlibDecoder { > + pub fn new(inner: T) -> Self { > + Self::with_buffer_size(inner, BUFFER_SIZE) > + } > + > + fn with_buffer_size(inner: T, buffer_size: usize) -> Self { > + Self { > + inner, > + decompressor: Decompress::new(true), The encoder apparently passes `false` for specifying the zlib header - theoretically, you could first extend the encoder with two more constructors that allow you to specify that the output will include a zlib header, e.g.: (in proxmox-compression/src/compression.rs) pub fn new_zlib(inner: T) -> Self { ... } fn with_buffer_size_zlib(inner: T, buffer_size: usize) -> Self { ... } Even though this feels a little clunky (I'd prefer to have a builder here, but it's not [yet] worth it) it doesn't break the API - though I'm of course open for other suggestions as well. You could then provide matching constructor `fn`s for your `ZlibDecoder` - you might as well call it `DeflateDecoder` in that case, too. That way we'd avoid having to introduce a separate en-/decoder `struct`=20 with its own `En`-/`DecoderState` `enum`. To elaborate, instead of: * `DeflateEncoder` * `DeflateDecoder` (doesn't exist yet) * `ZlibEncoder` (doesn't exist yet) * `ZlibDecoder` .. we'd end up with: * `DeflateEncoder` (now supports both DEFLATE and zlib) * `DefladeDecoder` (also supports both) So, we'd reduce a lot of additional duplicated code. Additionally, you could rename the `compression.rs` module to `flate.rs` or something like that - because the encoder is exported in `lib.rs` anyway, this shouldn't break any existing usages (and if it does, you could provide a small compatibility module in `lib.rs` for the time being, if that's preferable). Either way, even though this turned into a little wall of text, don't fret - I have nothing to bicker about in regards to the code here ;) LGTM! > + buffer: ByteBuffer::with_capacity(buffer_size), > + input_buffer: Bytes::new(), > + state: EncoderState::Reading, > + } > + } > + > + fn decode( > + &mut self, > + inbuf: &[u8], > + flush: FlushDecompress, > + ) -> Result<(usize, flate2::Status), io::Error> { > + let old_in =3D self.decompressor.total_in(); > + let old_out =3D self.decompressor.total_out(); > + let res =3D self > + .decompressor > + .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?= ; > + let new_in =3D (self.decompressor.total_in() - old_in) as usize; > + let new_out =3D (self.decompressor.total_out() - old_out) as usi= ze; > + self.buffer.add_size(new_out); > + > + Ok((new_in, res)) > + } > +} > + > +impl Stream for ZlibDecoder > +where > + T: Stream> + Unpin, > + O: Into, > + E: Into, > +{ > + type Item =3D Result; > + > + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { > + let this =3D self.get_mut(); > + > + loop { > + match this.state { > + EncoderState::Reading =3D> { > + if let Some(res) =3D ready!(Pin::new(&mut this.inner= ).poll_next(cx)) { > + let buf =3D res.map_err(Into::into)?; > + this.input_buffer =3D buf.into(); > + this.state =3D EncoderState::Writing; > + } else { > + this.state =3D EncoderState::Flushing; > + } > + } > + EncoderState::Writing =3D> { > + if this.input_buffer.is_empty() { > + return Poll::Ready(Some(Err(anyhow::format_err!( > + "empty input during write" > + )))); > + } > + let mut buf =3D this.input_buffer.split_off(0); > + let (read, res) =3D this.decode(&buf[..], FlushDecom= press::None)?; > + this.input_buffer =3D buf.split_off(read); > + if this.input_buffer.is_empty() { > + this.state =3D EncoderState::Reading; > + } > + if this.buffer.is_full() || res =3D=3D flate2::Statu= s::BufError { > + let bytes =3D this.buffer.remove_data(this.buffe= r.len()).to_vec(); > + return Poll::Ready(Some(Ok(bytes.into()))); > + } > + } > + EncoderState::Flushing =3D> { > + let (_read, res) =3D this.decode(&[][..], FlushDecom= press::Finish)?; > + if !this.buffer.is_empty() { > + let bytes =3D this.buffer.remove_data(this.buffe= r.len()).to_vec(); > + return Poll::Ready(Some(Ok(bytes.into()))); > + } > + if res =3D=3D flate2::Status::StreamEnd { > + this.state =3D EncoderState::Finished; > + } > + } > + EncoderState::Finished =3D> return Poll::Ready(None), > + } > + } > + } > +} > + > +#[cfg(test)] > +mod test { > + use super::*; > + > + use futures::StreamExt; > + use std::io::Write; > + > + const BODY: &str =3D r#"Lorem ipsum dolor sit amet, consectetur adip= iscing elit, sed do > +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat vol= uptatem. Ut > +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna acc= essio potest, > +si aliquod aeternum et infinitum impendere."#; > + > + fn encode_deflate(bytes: &[u8]) -> Result, std::io::Error> { > + use flate2::Compression; > + use std::io::Write; > + > + let mut e =3D flate2::write::ZlibEncoder::new(Vec::new(), Compre= ssion::default()); > + e.write_all(bytes).unwrap(); > + > + e.finish() > + } > + > + #[tokio::test] > + async fn test_decompression() { > + const BUFFER_SIZE: usize =3D 5; > + let encoded =3D encode_deflate(BODY.as_bytes()).unwrap(); > + let chunks: Vec> =3D vec![ > + Ok(encoded[..10].to_vec()), > + Ok(encoded[10..20].to_vec()), > + Ok(encoded[20..30].to_vec()), > + Ok(encoded[30..40].to_vec()), > + Ok(encoded[40..].to_vec()), > + ]; > + let stream =3D futures::stream::iter(chunks); > + let mut decoder =3D ZlibDecoder::with_buffer_size(stream, BUFFER= _SIZE); > + let mut buf =3D Vec::with_capacity(BODY.len()); > + > + while let Some(Ok(res)) =3D decoder.next().await { > + buf.write_all(&res).unwrap(); > + } > + assert_eq!(buf, BODY.as_bytes()); > + } Once you're incorporating the changes I suggested above, you could then do some more elaborate testing for both the en- and decoder - e.g. test if the content compressed by the encoder gets correctly decompressed by `flate2`'s decompressor and vice versa for the decoder (for both DEFLATE and zlib modes). While I trust that the encoder works as-is (not like we've been using it all the time, duh), it doesn't hurt to have tests there too, just in case. > +} As mentioned above, the changes below should be put into a separate patch. > diff --git a/proxmox-http/Cargo.toml b/proxmox-http/Cargo.toml > index 9ece24eb..4455ba85 100644 > --- a/proxmox-http/Cargo.toml > +++ b/proxmox-http/Cargo.toml > @@ -26,6 +26,11 @@ proxmox-async =3D { workspace =3D true, optional =3D t= rue } > proxmox-sys =3D { workspace =3D true, optional =3D true } > proxmox-io =3D { workspace =3D true, optional =3D true } > proxmox-lang =3D { workspace =3D true, optional =3D true } > +proxmox-compression =3D { workspace =3D true, optional =3D true } > + > +[dev-dependencies] > +tokio =3D { workspace =3D true, features =3D [ "macros" ] } > +flate2 =3D { workspace =3D true } > =20 > [features] > default =3D [] > @@ -42,12 +47,14 @@ client =3D [ > "dep:futures", > "dep:hyper", > "dep:openssl", > + "dep:proxmox-compression", > "dep:tokio", > "dep:tokio-openssl", > "http-helpers", > "hyper?/client", > "hyper?/http1", > "hyper?/http2", > + "hyper?/stream", > "hyper?/tcp", > "rate-limited-stream", > "tokio?/io-util", > diff --git a/proxmox-http/src/client/simple.rs b/proxmox-http/src/client/= simple.rs > index e9910802..b9e240d2 100644 > --- a/proxmox-http/src/client/simple.rs > +++ b/proxmox-http/src/client/simple.rs > @@ -78,7 +78,8 @@ impl Client { > =20 > self.add_proxy_headers(&mut request)?; > =20 > - self.client.request(request).map_err(Error::from).await > + let encoded_response =3D self.client.request(request).map_err(Er= ror::from).await?; > + decode_response(encoded_response).await > } > =20 > pub async fn post( > @@ -245,3 +246,63 @@ impl crate::HttpClient for Client { > }) > } > } > + > +/// Wraps the `Body` stream in a ZlibDecoder stream if the `Content-Enco= ding` > +/// header of the response is `deflate`, otherwise returns the original > +/// response. > +async fn decode_response(mut res: Response) -> Result, Error> { > + let Some(content_encoding) =3D res.headers_mut().remove(&hyper::head= er::CONTENT_ENCODING) else { > + return Ok(res); > + }; > + > + let encodings =3D content_encoding.to_str()?; > + if encodings =3D=3D "deflate" { > + let (parts, body) =3D res.into_parts(); > + let decoder =3D proxmox_compression::ZlibDecoder::new(body); > + let decoded_body =3D Body::wrap_stream(decoder); > + Ok(Response::from_parts(parts, decoded_body)) > + } else { > + bail!("Unknown encoding format: {encodings}"); > + } > +} > + > +#[cfg(test)] > +mod test { > + use super::*; > + > + use std::io::Write; > + > + const BODY: &str =3D r#"Lorem ipsum dolor sit amet, consectetur adip= iscing elit, sed do > +eiusmod tempor incididunt ut labore et dolore magnam aliquam quaerat vol= uptatem. Ut > +enim aeque doleamus animo, cum corpore dolemus, fieri tamen permagna acc= essio potest, > +si aliquod aeternum et infinitum impendere."#; > + > + #[tokio::test] > + async fn test_parse_response_deflate() { > + let encoded =3D encode_deflate(BODY.as_bytes()).unwrap(); > + let encoded_body =3D Body::from(encoded); > + let encoded_response =3D Response::builder() > + .header(hyper::header::CONTENT_ENCODING, "deflate") > + .body(encoded_body) > + .unwrap(); > + > + let decoded_response =3D decode_response(encoded_response).await= .unwrap(); > + > + assert_eq!( > + Client::response_body_string(decoded_response) > + .await > + .unwrap(), > + BODY > + ); > + } > + > + fn encode_deflate(bytes: &[u8]) -> Result, std::io::Error> { > + use flate2::write::ZlibEncoder; > + use flate2::Compression; > + > + let mut e =3D ZlibEncoder::new(Vec::new(), Compression::default(= )); > + e.write_all(bytes).unwrap(); > + > + e.finish() > + } > +}