From: Maximiliano Sandoval <m.sandoval@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox v3 3/5] compression: deflate: add a decoder
Date: Wed, 10 Apr 2024 16:30:34 +0200 [thread overview]
Message-ID: <20240410143036.81807-4-m.sandoval@proxmox.com> (raw)
In-Reply-To: <20240410143036.81807-1-m.sandoval@proxmox.com>
Signed-off-by: Maximiliano Sandoval <m.sandoval@proxmox.com>
---
.../src/deflate/decompression.rs | 141 ++++++++++++++++++
proxmox-compression/src/deflate/mod.rs | 2 +
proxmox-compression/src/lib.rs | 2 +-
3 files changed, 144 insertions(+), 1 deletion(-)
create mode 100644 proxmox-compression/src/deflate/decompression.rs
diff --git a/proxmox-compression/src/deflate/decompression.rs b/proxmox-compression/src/deflate/decompression.rs
new file mode 100644
index 00000000..45ed8579
--- /dev/null
+++ b/proxmox-compression/src/deflate/decompression.rs
@@ -0,0 +1,141 @@
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use anyhow::Error;
+use bytes::Bytes;
+use flate2::{Decompress, FlushDecompress};
+use futures::ready;
+use futures::stream::Stream;
+
+use proxmox_io::ByteBuffer;
+
+#[derive(Eq, PartialEq)]
+enum DecoderState {
+ Reading,
+ Writing,
+ Flushing,
+ Finished,
+}
+
+pub struct DeflateDecoder<T> {
+ inner: T,
+ decompressor: Decompress,
+ buffer: ByteBuffer,
+ input_buffer: Bytes,
+ state: DecoderState,
+}
+
+pub struct DeflateDecoderBuilder<T> {
+ inner: T,
+ is_zlib: bool,
+ buffer_size: usize,
+}
+
+impl<T> DeflateDecoderBuilder<T> {
+ pub fn zlib(mut self, is_zlib: bool) -> Self {
+ self.is_zlib = is_zlib;
+ self
+ }
+
+ pub fn buffer_size(mut self, buffer_size: usize) -> Self {
+ self.buffer_size = buffer_size;
+ self
+ }
+
+ pub fn build(self) -> DeflateDecoder<T> {
+ DeflateDecoder {
+ inner: self.inner,
+ decompressor: Decompress::new(self.is_zlib),
+ buffer: ByteBuffer::with_capacity(self.buffer_size),
+ input_buffer: Bytes::new(),
+ state: DecoderState::Reading,
+ }
+ }
+}
+
+impl<T> DeflateDecoder<T> {
+ pub fn new(inner: T) -> Self {
+ Self::builder(inner).build()
+ }
+
+ pub fn builder(inner: T) -> DeflateDecoderBuilder<T> {
+ DeflateDecoderBuilder {
+ inner,
+ is_zlib: false,
+ buffer_size: super::BUFFER_SIZE,
+ }
+ }
+
+ fn decode(
+ &mut self,
+ inbuf: &[u8],
+ flush: FlushDecompress,
+ ) -> Result<(usize, flate2::Status), io::Error> {
+ let old_in = self.decompressor.total_in();
+ let old_out = self.decompressor.total_out();
+ let res = self
+ .decompressor
+ .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?;
+ let new_in = (self.decompressor.total_in() - old_in) as usize;
+ let new_out = (self.decompressor.total_out() - old_out) as usize;
+ self.buffer.add_size(new_out);
+
+ Ok((new_in, res))
+ }
+}
+
+impl<T, O, E> Stream for DeflateDecoder<T>
+where
+ T: Stream<Item = Result<O, E>> + Unpin,
+ O: Into<Bytes>,
+ E: Into<Error>,
+{
+ type Item = Result<Bytes, anyhow::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+
+ loop {
+ match this.state {
+ DecoderState::Reading => {
+ if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
+ let buf = res.map_err(Into::into)?;
+ this.input_buffer = buf.into();
+ this.state = DecoderState::Writing;
+ } else {
+ this.state = DecoderState::Flushing;
+ }
+ }
+ DecoderState::Writing => {
+ if this.input_buffer.is_empty() {
+ return Poll::Ready(Some(Err(anyhow::format_err!(
+ "empty input during write"
+ ))));
+ }
+ let mut buf = this.input_buffer.split_off(0);
+ let (read, res) = this.decode(&buf[..], FlushDecompress::None)?;
+ this.input_buffer = buf.split_off(read);
+ if this.input_buffer.is_empty() {
+ this.state = DecoderState::Reading;
+ }
+ if this.buffer.is_full() || res == flate2::Status::BufError {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ }
+ DecoderState::Flushing => {
+ let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?;
+ if !this.buffer.is_empty() {
+ let bytes = this.buffer.remove_data(this.buffer.len()).to_vec();
+ return Poll::Ready(Some(Ok(bytes.into())));
+ }
+ if res == flate2::Status::StreamEnd {
+ this.state = DecoderState::Finished;
+ }
+ }
+ DecoderState::Finished => return Poll::Ready(None),
+ }
+ }
+ }
+}
diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs
index 514ccbdc..6867176c 100644
--- a/proxmox-compression/src/deflate/mod.rs
+++ b/proxmox-compression/src/deflate/mod.rs
@@ -1,5 +1,7 @@
mod compression;
+mod decompression;
pub use compression::{DeflateEncoder, Level};
+pub use decompression::DeflateDecoder;
const BUFFER_SIZE: usize = 8192;
diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs
index 239de623..f33212e5 100644
--- a/proxmox-compression/src/lib.rs
+++ b/proxmox-compression/src/lib.rs
@@ -1,4 +1,4 @@
-pub use deflate::{DeflateEncoder, Level};
+pub use deflate::{DeflateDecoder, DeflateEncoder, Level};
pub(crate) mod deflate;
pub mod tar;
--
2.39.2
next prev parent reply other threads:[~2024-04-10 14:31 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-04-10 14:30 [pbs-devel] [PATCH proxmox v3 0/5] Teach HTTP client how to decode deflate Maximiliano Sandoval
2024-04-10 14:30 ` [pbs-devel] [PATCH proxmox v3 1/5] compression: deflate: Move encoder into a mod Maximiliano Sandoval
2024-04-12 12:41 ` Gabriel Goller
2024-04-10 14:30 ` [pbs-devel] [PATCH proxmox v3 2/5] compression: deflate: add builder pattern Maximiliano Sandoval
2024-04-10 14:30 ` Maximiliano Sandoval [this message]
2024-04-10 14:30 ` [pbs-devel] [PATCH proxmox v3 4/5] compression: deflate: add test module Maximiliano Sandoval
2024-04-10 14:30 ` [pbs-devel] [PATCH proxmox v3 5/5] http: teach the Client how to decode deflate content Maximiliano Sandoval
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240410143036.81807-4-m.sandoval@proxmox.com \
--to=m.sandoval@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox