public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate
Date: Tue,  6 Apr 2021 11:03:47 +0200	[thread overview]
Message-ID: <20210406090347.27579-8-d.csapak@proxmox.com> (raw)
In-Reply-To: <20210406090347.27579-1-d.csapak@proxmox.com>

by using our DeflateEncoder

for this to work, we have to create wrapper reader that generates the crc32
checksum while reading.

also we need to put the target writer in an Option, so that we can take
it out of self and move it into the DeflateEncoder while writing
compressed

we can drop the internal buffer then, since that is managed by the
deflate encoder now

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools/zip.rs | 132 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 94 insertions(+), 38 deletions(-)

diff --git a/src/tools/zip.rs b/src/tools/zip.rs
index afacfbb9..37483d4b 100644
--- a/src/tools/zip.rs
+++ b/src/tools/zip.rs
@@ -10,15 +10,19 @@ use std::io;
 use std::mem::size_of;
 use std::os::unix::ffi::OsStrExt;
 use std::path::{Component, Path, PathBuf};
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
-use anyhow::{Error, Result};
+use anyhow::{format_err, Error, Result};
 use endian_trait::Endian;
-use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+use futures::ready;
+use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
 
 use crc32fast::Hasher;
-use proxmox::tools::byte_buffer::ByteBuffer;
 use proxmox::tools::time::gmtime;
 
+use crate::tools::compression::{DeflateEncoder, Level};
+
 const LOCAL_FH_SIG: u32 = 0x04034B50;
 const LOCAL_FF_SIG: u32 = 0x08074B50;
 const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50;
@@ -245,7 +249,7 @@ impl ZipEntry {
                 signature: LOCAL_FH_SIG,
                 version_needed: 0x2d,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: 0,
@@ -328,7 +332,7 @@ impl ZipEntry {
                 version_made_by: VERSION_MADE_BY,
                 version_needed: VERSION_NEEDED,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: self.crc32,
@@ -366,6 +370,47 @@ impl ZipEntry {
     }
 }
 
+// wraps an asyncreader and calculates the hash
+struct HashWrapper<R> {
+    inner: R,
+    hasher: Hasher,
+}
+
+impl<R> HashWrapper<R> {
+    fn new(inner: R) -> Self {
+        Self {
+            inner,
+            hasher: Hasher::new(),
+        }
+    }
+
+    // consumes self and returns the hash and the reader
+    fn finish(self) -> (u32, R) {
+        let crc32 = self.hasher.finalize();
+        (crc32, self.inner)
+    }
+}
+
+impl<R> AsyncRead for HashWrapper<R>
+where
+    R: AsyncRead + Unpin,
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        let this = self.get_mut();
+        let old_len = buf.filled().len();
+        ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?;
+        let new_len = buf.filled().len();
+        if new_len > old_len {
+            this.hasher.update(&buf.filled()[old_len..new_len]);
+        }
+        Poll::Ready(Ok(()))
+    }
+}
+
 /// Wraps a writer that implements AsyncWrite for creating a ZIP archive
 ///
 /// This will create a ZIP archive on the fly with files added with
@@ -400,8 +445,7 @@ where
 {
     byte_count: usize,
     files: Vec<ZipEntry>,
-    target: W,
-    buf: ByteBuffer,
+    target: Option<W>,
 }
 
 impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
@@ -409,8 +453,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         Self {
             byte_count: 0,
             files: Vec::new(),
-            target,
-            buf: ByteBuffer::with_capacity(1024 * 1024),
+            target: Some(target),
         }
     }
 
@@ -419,31 +462,31 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         mut entry: ZipEntry,
         content: Option<R>,
     ) -> Result<(), Error> {
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during add entry"))?;
         entry.offset = self.byte_count.try_into()?;
-        self.byte_count += entry.write_local_header(&mut self.target).await?;
-        if let Some(mut content) = content {
-            let mut hasher = Hasher::new();
-            let mut size = 0;
-            loop {
-                let count = self.buf.read_from_async(&mut content).await?;
-
-                // end of file
-                if count == 0 {
-                    break;
-                }
-
-                size += count;
-                hasher.update(&self.buf);
-                self.target.write_all(&self.buf).await?;
-                self.buf.consume(count);
-            }
+        self.byte_count += entry.write_local_header(&mut target).await?;
+        if let Some(content) = content {
+            let mut reader = HashWrapper::new(content);
+            let mut enc = DeflateEncoder::with_quality(target, Level::Fastest);
 
-            self.byte_count += size;
-            entry.compressed_size = size.try_into()?;
-            entry.uncompressed_size = size.try_into()?;
-            entry.crc32 = hasher.finalize();
+            enc.compress(&mut reader).await?;
+            let total_in = enc.total_in();
+            let total_out = enc.total_out();
+            target = enc.into_inner();
+
+            let (crc32, _reader) = reader.finish();
+
+            self.byte_count += total_out as usize;
+            entry.compressed_size = total_out;
+            entry.uncompressed_size = total_in;
+
+            entry.crc32 = crc32;
         }
-        self.byte_count += entry.write_data_descriptor(&mut self.target).await?;
+        self.byte_count += entry.write_data_descriptor(&mut target).await?;
+        self.target = Some(target);
 
         self.files.push(entry);
 
@@ -456,6 +499,10 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         central_dir_offset: usize,
     ) -> Result<(), Error> {
         let entrycount = self.files.len();
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during write_eocd"))?;
 
         let mut count = entrycount as u16;
         let mut directory_size = central_dir_size as u32;
@@ -470,7 +517,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             directory_offset = 0xFFFFFFFF;
 
             write_struct(
-                &mut self.target,
+                &mut target,
                 Zip64EOCDRecord {
                     signature: ZIP64_EOCD_RECORD,
                     field_size: 44,
@@ -489,7 +536,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             let locator_offset = central_dir_offset + central_dir_size;
 
             write_struct(
-                &mut self.target,
+                &mut target,
                 Zip64EOCDLocator {
                     signature: ZIP64_EOCD_LOCATOR,
                     disk_number: 0,
@@ -501,7 +548,7 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         }
 
         write_struct(
-            &mut self.target,
+            &mut target,
             EndOfCentralDir {
                 signature: END_OF_CENTRAL_DIR,
                 disk_number: 0,
@@ -515,23 +562,32 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         )
         .await?;
 
+        self.target = Some(target);
+
         Ok(())
     }
 
     pub async fn finish(&mut self) -> Result<(), Error> {
+        let mut target = self
+            .target
+            .take()
+            .ok_or_else(|| format_err!("had no target during finish"))?;
         let central_dir_offset = self.byte_count;
         let mut central_dir_size = 0;
 
         for file in &self.files {
-            central_dir_size += file
-                .write_central_directory_header(&mut self.target)
-                .await?;
+            central_dir_size += file.write_central_directory_header(&mut target).await?;
         }
 
+        self.target = Some(target);
         self.write_eocd(central_dir_size, central_dir_offset)
             .await?;
 
-        self.target.flush().await?;
+        self.target
+            .take()
+            .ok_or_else(|| format_err!("had no target for flush"))?
+            .flush()
+            .await?;
 
         Ok(())
     }
-- 
2.20.1





  parent reply	other threads:[~2021-04-06  9:04 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-04-06  9:03 [pbs-devel] [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 1/7] tools: add compression module Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 2/7] tools/compression: add DeflateEncoder and helpers Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 3/7] server/rest: add helper to extract compression headers Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 4/7] server/rest: compress api calls Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 5/7] server/rest: compress static files Dominik Csapak
2021-04-06  9:03 ` [pbs-devel] [PATCH proxmox-backup v3 6/7] tools/zip: run rustfmt Dominik Csapak
2021-04-06  9:03 ` Dominik Csapak [this message]
2021-04-07 16:00 ` [pbs-devel] applied-series: [PATCH proxmox-backup v3 0/7] add compression to api/static files and zip Thomas Lamprecht

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210406090347.27579-8-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal