From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id E243D6A732 for ; Mon, 15 Mar 2021 12:21:20 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id CF11D202DA for ; Mon, 15 Mar 2021 12:21:20 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 13989202D2 for ; Mon, 15 Mar 2021 12:21:20 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D1B2F426CE for ; Mon, 15 Mar 2021 12:21:19 +0100 (CET) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Mon, 15 Mar 2021 12:21:18 +0100 Message-Id: <20210315112118.13641-3-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210315112118.13641-1-d.csapak@proxmox.com> References: <20210315112118.13641-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.188 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC PATCH proxmox-backup 3/3] tools/zip: compress zips with deflate X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 15 Mar 2021 11:21:20 -0000 to get smaller zip files Signed-off-by: Dominik Csapak --- @Wolfgang, could you please look at this? I am not sure about using the Compress in an async function. It is only in memory, but does it 'block'? i am not sure how we could do this differently in an async context though... Cargo.toml | 1 + src/tools/zip.rs | 75 +++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 79945312..06967c20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ crc32fast = "1" endian_trait = { version = "0.6", features = ["arrays"] } anyhow = "1.0" futures = "0.3" +flate2 = "1.0" h2 = { version = "0.3", features = [ "stream" ] } handlebars = "3.0" http = "0.2" diff --git a/src/tools/zip.rs b/src/tools/zip.rs index 55f2a24a..237b8a1f 100644 --- a/src/tools/zip.rs +++ b/src/tools/zip.rs @@ -11,9 +11,10 @@ use std::mem::size_of; use std::os::unix::ffi::OsStrExt; use std::path::{Component, Path, PathBuf}; -use anyhow::{Error, Result}; +use anyhow::{bail, Error, Result}; use endian_trait::Endian; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use flate2::{Compress, Compression, FlushCompress}; use crc32fast::Hasher; use proxmox::tools::time::gmtime; @@ -245,7 +246,7 @@ impl ZipEntry { signature: LOCAL_FH_SIG, version_needed: 0x2d, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: 0, @@ -328,7 +329,7 @@ impl ZipEntry { version_made_by: VERSION_MADE_BY, version_needed: VERSION_NEEDED, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: self.crc32, @@ -402,6 +403,7 @@ where files: Vec, target: W, buf: ByteBuffer, + outbuf: ByteBuffer, } impl ZipEncoder { @@ -410,7 +412,8 @@ impl ZipEncoder { byte_count: 0, files: Vec::new(), target, - buf: ByteBuffer::with_capacity(1024*1024), + buf: ByteBuffer::with_capacity(1024 * 1024), + outbuf: ByteBuffer::with_capacity(1024 * 1024), } } @@ -423,25 +426,71 @@ impl ZipEncoder { self.byte_count += entry.write_local_header(&mut self.target).await?; if let Some(mut content) = content { let mut hasher = Hasher::new(); - let mut size = 0; + let mut deflate_encoder = Compress::new(Compression::fast(), false); + loop { + let syncmode = if self.buf.is_full() { + FlushCompress::Sync + } else { + FlushCompress::None + }; + + let old_pos = self.buf.len(); let count = self.buf.read_from_async(&mut content).await?; // end of file - if count == 0 { + if count == 0 && syncmode == FlushCompress::None { break; } - size += count; - hasher.update(&self.buf); - self.target.write_all(&self.buf).await?; - self.buf.consume(count); + hasher.update(&self.buf[old_pos..]); + + let old_read = deflate_encoder.total_in(); + let old_write = deflate_encoder.total_out(); + deflate_encoder.compress( + &self.buf, + &mut self.outbuf.get_free_mut_slice(), + syncmode, + )?; + let read = (deflate_encoder.total_in() - old_read) as usize; + let write = (deflate_encoder.total_out() - old_write) as usize; + + self.outbuf.add_size(write); + + if read == 0 { + bail!("did not consume any data!"); + } + + self.target.write_all(&self.outbuf).await?; + self.buf.consume(read); + self.outbuf.clear(); } - self.byte_count += size; - entry.compressed_size = size.try_into()?; - entry.uncompressed_size = size.try_into()?; + let old_read = deflate_encoder.total_in(); + let old_write = deflate_encoder.total_out(); + deflate_encoder.compress( + &self.buf, + &mut self.outbuf.get_free_mut_slice(), + FlushCompress::Finish, + )?; + let read = (deflate_encoder.total_in() - old_read) as usize; + let write = (deflate_encoder.total_out() - old_write) as usize; + + self.outbuf.add_size(write); + + if read != self.buf.len() { + bail!("deflate did not use all input bytes!"); + } + + self.target.write_all(&self.outbuf).await?; + self.buf.clear(); + self.outbuf.clear(); + + self.byte_count += deflate_encoder.total_out() as usize; + entry.compressed_size = deflate_encoder.total_out(); + entry.uncompressed_size = deflate_encoder.total_in(); + entry.crc32 = hasher.finalize(); } self.byte_count += entry.write_data_descriptor(&mut self.target).await?; -- 2.20.1