From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 2813C70E72 for ; Tue, 6 Apr 2021 11:04:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1EB2C2B6A6 for ; Tue, 6 Apr 2021 11:03:55 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 941E42B62D for ; Tue, 6 Apr 2021 11:03:49 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 60F2B41F21 for ; Tue, 6 Apr 2021 11:03:49 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 6 Apr 2021 11:03:47 +0200 Message-Id: <20210406090347.27579-8-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20210406090347.27579-1-d.csapak@proxmox.com> References: <20210406090347.27579-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.169 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v3 7/7] tools/zip: compress zips with deflate X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 06 Apr 2021 09:04:25 -0000 by using our DeflateEncoder for this to work, we have to create wrapper reader that generates the crc32 checksum while reading. also we need to put the target writer in an Option, so that we can take it out of self and move it into the DeflateEncoder while writing compressed we can drop the internal buffer then, since that is managed by the deflate encoder now Signed-off-by: Dominik Csapak --- src/tools/zip.rs | 132 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 94 insertions(+), 38 deletions(-) diff --git a/src/tools/zip.rs b/src/tools/zip.rs index afacfbb9..37483d4b 100644 --- a/src/tools/zip.rs +++ b/src/tools/zip.rs @@ -10,15 +10,19 @@ use std::io; use std::mem::size_of; use std::os::unix::ffi::OsStrExt; use std::path::{Component, Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context, Poll}; -use anyhow::{Error, Result}; +use anyhow::{format_err, Error, Result}; use endian_trait::Endian; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use futures::ready; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use crc32fast::Hasher; -use proxmox::tools::byte_buffer::ByteBuffer; use proxmox::tools::time::gmtime; +use crate::tools::compression::{DeflateEncoder, Level}; + const LOCAL_FH_SIG: u32 = 0x04034B50; const LOCAL_FF_SIG: u32 = 0x08074B50; const CENTRAL_DIRECTORY_FH_SIG: u32 = 0x02014B50; @@ -245,7 +249,7 @@ impl ZipEntry { signature: LOCAL_FH_SIG, version_needed: 0x2d, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: 0, @@ -328,7 +332,7 @@ impl ZipEntry { version_made_by: VERSION_MADE_BY, version_needed: VERSION_NEEDED, flags: 1 << 3, - compression: 0, + compression: 0x8, time, date, crc32: self.crc32, @@ -366,6 +370,47 @@ impl ZipEntry { } } +// wraps an asyncreader and calculates the hash +struct HashWrapper { + inner: R, + hasher: Hasher, +} + +impl HashWrapper { + fn new(inner: R) -> Self { + Self { + inner, + hasher: Hasher::new(), + } + } + + // consumes self and returns the hash and the reader + fn finish(self) -> (u32, R) { + let crc32 = self.hasher.finalize(); + (crc32, self.inner) + } +} + +impl AsyncRead for HashWrapper +where + R: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + let old_len = buf.filled().len(); + ready!(Pin::new(&mut this.inner).poll_read(cx, buf))?; + let new_len = buf.filled().len(); + if new_len > old_len { + this.hasher.update(&buf.filled()[old_len..new_len]); + } + Poll::Ready(Ok(())) + } +} + /// Wraps a writer that implements AsyncWrite for creating a ZIP archive /// /// This will create a ZIP archive on the fly with files added with @@ -400,8 +445,7 @@ where { byte_count: usize, files: Vec, - target: W, - buf: ByteBuffer, + target: Option, } impl ZipEncoder { @@ -409,8 +453,7 @@ impl ZipEncoder { Self { byte_count: 0, files: Vec::new(), - target, - buf: ByteBuffer::with_capacity(1024 * 1024), + target: Some(target), } } @@ -419,31 +462,31 @@ impl ZipEncoder { mut entry: ZipEntry, content: Option, ) -> Result<(), Error> { + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during add entry"))?; entry.offset = self.byte_count.try_into()?; - self.byte_count += entry.write_local_header(&mut self.target).await?; - if let Some(mut content) = content { - let mut hasher = Hasher::new(); - let mut size = 0; - loop { - let count = self.buf.read_from_async(&mut content).await?; - - // end of file - if count == 0 { - break; - } - - size += count; - hasher.update(&self.buf); - self.target.write_all(&self.buf).await?; - self.buf.consume(count); - } + self.byte_count += entry.write_local_header(&mut target).await?; + if let Some(content) = content { + let mut reader = HashWrapper::new(content); + let mut enc = DeflateEncoder::with_quality(target, Level::Fastest); - self.byte_count += size; - entry.compressed_size = size.try_into()?; - entry.uncompressed_size = size.try_into()?; - entry.crc32 = hasher.finalize(); + enc.compress(&mut reader).await?; + let total_in = enc.total_in(); + let total_out = enc.total_out(); + target = enc.into_inner(); + + let (crc32, _reader) = reader.finish(); + + self.byte_count += total_out as usize; + entry.compressed_size = total_out; + entry.uncompressed_size = total_in; + + entry.crc32 = crc32; } - self.byte_count += entry.write_data_descriptor(&mut self.target).await?; + self.byte_count += entry.write_data_descriptor(&mut target).await?; + self.target = Some(target); self.files.push(entry); @@ -456,6 +499,10 @@ impl ZipEncoder { central_dir_offset: usize, ) -> Result<(), Error> { let entrycount = self.files.len(); + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during write_eocd"))?; let mut count = entrycount as u16; let mut directory_size = central_dir_size as u32; @@ -470,7 +517,7 @@ impl ZipEncoder { directory_offset = 0xFFFFFFFF; write_struct( - &mut self.target, + &mut target, Zip64EOCDRecord { signature: ZIP64_EOCD_RECORD, field_size: 44, @@ -489,7 +536,7 @@ impl ZipEncoder { let locator_offset = central_dir_offset + central_dir_size; write_struct( - &mut self.target, + &mut target, Zip64EOCDLocator { signature: ZIP64_EOCD_LOCATOR, disk_number: 0, @@ -501,7 +548,7 @@ impl ZipEncoder { } write_struct( - &mut self.target, + &mut target, EndOfCentralDir { signature: END_OF_CENTRAL_DIR, disk_number: 0, @@ -515,23 +562,32 @@ impl ZipEncoder { ) .await?; + self.target = Some(target); + Ok(()) } pub async fn finish(&mut self) -> Result<(), Error> { + let mut target = self + .target + .take() + .ok_or_else(|| format_err!("had no target during finish"))?; let central_dir_offset = self.byte_count; let mut central_dir_size = 0; for file in &self.files { - central_dir_size += file - .write_central_directory_header(&mut self.target) - .await?; + central_dir_size += file.write_central_directory_header(&mut target).await?; } + self.target = Some(target); self.write_eocd(central_dir_size, central_dir_offset) .await?; - self.target.flush().await?; + self.target + .take() + .ok_or_else(|| format_err!("had no target for flush"))? + .flush() + .await?; Ok(()) } -- 2.20.1