From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <d.csapak@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id E243D6A732
 for <pbs-devel@lists.proxmox.com>; Mon, 15 Mar 2021 12:21:20 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id CF11D202DA
 for <pbs-devel@lists.proxmox.com>; Mon, 15 Mar 2021 12:21:20 +0100 (CET)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [212.186.127.180])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id 13989202D2
 for <pbs-devel@lists.proxmox.com>; Mon, 15 Mar 2021 12:21:20 +0100 (CET)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D1B2F426CE
 for <pbs-devel@lists.proxmox.com>; Mon, 15 Mar 2021 12:21:19 +0100 (CET)
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Mon, 15 Mar 2021 12:21:18 +0100
Message-Id: <20210315112118.13641-3-d.csapak@proxmox.com>
X-Mailer: git-send-email 2.20.1
In-Reply-To: <20210315112118.13641-1-d.csapak@proxmox.com>
References: <20210315112118.13641-1-d.csapak@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.188 Adjusted score from AWL reputation of From: address
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_DNSWL_MED        -2.3 Sender listed at https://www.dnswl.org/,
 medium trust
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [RFC PATCH proxmox-backup 3/3] tools/zip: compress zips
 with deflate
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Mon, 15 Mar 2021 11:21:20 -0000

to get smaller zip files

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
@Wolfgang, could you please look at this? I am not sure about using
the Compress in an async function. It is only in memory, but does it
'block'? i am not sure how we could do this differently in an
async context though...

 Cargo.toml       |  1 +
 src/tools/zip.rs | 75 +++++++++++++++++++++++++++++++++++++++---------
 2 files changed, 63 insertions(+), 13 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 79945312..06967c20 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,7 @@ crc32fast = "1"
 endian_trait = { version = "0.6", features = ["arrays"] }
 anyhow = "1.0"
 futures = "0.3"
+flate2 = "1.0"
 h2 = { version = "0.3", features = [ "stream" ] }
 handlebars = "3.0"
 http = "0.2"
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
index 55f2a24a..237b8a1f 100644
--- a/src/tools/zip.rs
+++ b/src/tools/zip.rs
@@ -11,9 +11,10 @@ use std::mem::size_of;
 use std::os::unix::ffi::OsStrExt;
 use std::path::{Component, Path, PathBuf};
 
-use anyhow::{Error, Result};
+use anyhow::{bail, Error, Result};
 use endian_trait::Endian;
 use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+use flate2::{Compress, Compression, FlushCompress};
 
 use crc32fast::Hasher;
 use proxmox::tools::time::gmtime;
@@ -245,7 +246,7 @@ impl ZipEntry {
                 signature: LOCAL_FH_SIG,
                 version_needed: 0x2d,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: 0,
@@ -328,7 +329,7 @@ impl ZipEntry {
                 version_made_by: VERSION_MADE_BY,
                 version_needed: VERSION_NEEDED,
                 flags: 1 << 3,
-                compression: 0,
+                compression: 0x8,
                 time,
                 date,
                 crc32: self.crc32,
@@ -402,6 +403,7 @@ where
     files: Vec<ZipEntry>,
     target: W,
     buf: ByteBuffer,
+    outbuf: ByteBuffer,
 }
 
 impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
@@ -410,7 +412,8 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
             byte_count: 0,
             files: Vec::new(),
             target,
-            buf: ByteBuffer::with_capacity(1024*1024),
+            buf: ByteBuffer::with_capacity(1024 * 1024),
+            outbuf: ByteBuffer::with_capacity(1024 * 1024),
         }
     }
 
@@ -423,25 +426,71 @@ impl<W: AsyncWrite + Unpin> ZipEncoder<W> {
         self.byte_count += entry.write_local_header(&mut self.target).await?;
         if let Some(mut content) = content {
             let mut hasher = Hasher::new();
-            let mut size = 0;
+            let mut deflate_encoder = Compress::new(Compression::fast(), false);
+
             loop {
 
+                let syncmode = if self.buf.is_full() {
+                    FlushCompress::Sync
+                } else {
+                    FlushCompress::None
+                };
+
+                let old_pos = self.buf.len();
                 let count = self.buf.read_from_async(&mut content).await?;
 
                 // end of file
-                if count == 0 {
+                if count == 0 && syncmode == FlushCompress::None {
                     break;
                 }
 
-                size += count;
-                hasher.update(&self.buf);
-                self.target.write_all(&self.buf).await?;
-                self.buf.consume(count);
+                hasher.update(&self.buf[old_pos..]);
+
+                let old_read = deflate_encoder.total_in();
+                let old_write = deflate_encoder.total_out();
+                deflate_encoder.compress(
+                    &self.buf,
+                    &mut self.outbuf.get_free_mut_slice(),
+                    syncmode,
+                )?;
+                let read = (deflate_encoder.total_in() - old_read) as usize;
+                let write = (deflate_encoder.total_out() - old_write) as usize;
+
+                self.outbuf.add_size(write);
+
+                if read == 0 {
+                    bail!("did not consume any data!");
+                }
+
+                self.target.write_all(&self.outbuf).await?;
+                self.buf.consume(read);
+                self.outbuf.clear();
             }
 
-            self.byte_count += size;
-            entry.compressed_size = size.try_into()?;
-            entry.uncompressed_size = size.try_into()?;
+            let old_read = deflate_encoder.total_in();
+            let old_write = deflate_encoder.total_out();
+            deflate_encoder.compress(
+                &self.buf,
+                &mut self.outbuf.get_free_mut_slice(),
+                FlushCompress::Finish,
+            )?;
+            let read = (deflate_encoder.total_in() - old_read) as usize;
+            let write = (deflate_encoder.total_out() - old_write) as usize;
+
+            self.outbuf.add_size(write);
+
+            if read != self.buf.len() {
+                bail!("deflate did not use all input bytes!");
+            }
+
+            self.target.write_all(&self.outbuf).await?;
+            self.buf.clear();
+            self.outbuf.clear();
+
+            self.byte_count += deflate_encoder.total_out() as usize;
+            entry.compressed_size = deflate_encoder.total_out();
+            entry.uncompressed_size = deflate_encoder.total_in();
+
             entry.crc32 = hasher.finalize();
         }
         self.byte_count += entry.write_data_descriptor(&mut self.target).await?;
-- 
2.20.1