From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 3EC4F615B7 for ; Tue, 13 Oct 2020 11:50:47 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1381FDBDC for ; Tue, 13 Oct 2020 11:50:47 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 41CADDAF8 for ; Tue, 13 Oct 2020 11:50:44 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 0E27C45D29 for ; Tue, 13 Oct 2020 11:50:44 +0200 (CEST) From: Dominik Csapak To: pbs-devel@lists.proxmox.com Date: Tue, 13 Oct 2020 11:50:41 +0200 Message-Id: <20201013095042.31337-1-d.csapak@proxmox.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.498 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [zip.rs, tools.rs] Subject: [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 13 Oct 2020 09:50:47 -0000 this is a module to to stream a zip file in an api call the user has to give the Encoder a stream of files (Path, mtime, mode, Reader) the resulting object itself implements stream for easy use in a hyper::Body for now, this does not implement compression (uses ZIPs STORE mode), and does not support empty directories or hardlinks (or any other special files) Signed-off-by: Dominik Csapak --- src/tools.rs | 1 + src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 469 insertions(+) create mode 100644 src/tools/zip.rs diff --git a/src/tools.rs b/src/tools.rs index 5b244c02..28f0338b 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -35,6 +35,7 @@ pub mod nom; pub mod logrotate; pub mod loopdev; pub mod fuse_loop; +pub mod zip; mod parallel_handler; pub use parallel_handler::*; diff --git a/src/tools/zip.rs b/src/tools/zip.rs new file mode 100644 index 00000000..acb193cb --- /dev/null +++ b/src/tools/zip.rs @@ -0,0 +1,468 @@ +use std::ffi::OsString; +use std::collections::VecDeque; +use std::os::unix::ffi::OsStrExt; +use std::pin::Pin; +use std::path::{Component, Path, PathBuf}; + +use futures::task::{Context, Poll}; +use tokio::io::AsyncRead; +use tokio::stream::Stream; +use anyhow::Result; + +use proxmox::io_format_err; +use proxmox::tools::{ + byte_buffer::ByteBuffer, + time::gmtime, +}; +use crc32fast::Hasher; + +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04]; +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08]; +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02]; +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06]; +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D) +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03) + +const ZIP64_TYPE: &[u8] = &[0x01, 0x00]; +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06]; +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07]; + +fn epoch_to_dos(epoch: i64) -> (u16, u16) { + let gmtime = match gmtime(epoch) { + Ok(gmtime) => gmtime, + Err(_) => return (0,0), + }; + + let seconds = gmtime.tm_sec/2 & 0b11111; + let minutes = gmtime.tm_min & 0xb111111; + let hours = gmtime.tm_hour & 0b11111; + let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16; + + let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) { + 0 + } else { + let day = gmtime.tm_mday & 0b11111; + let month = (gmtime.tm_mon + 1) & 0b1111; + let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111; + ((year<<9)|(month<<5)|(day)) as u16 + }; + + (date, time) +} + +struct Zip64Field { + size: u64, + compressed_size: u64, + offset: u64, +} + +impl Zip64Field { + fn to_bytes(&self, buf: &mut [u8], include_offset: bool) { + let size :u16 = if include_offset { 24 } else { 16 }; + buf[0..2].copy_from_slice(ZIP64_TYPE); + buf[2..4].copy_from_slice(&size.to_le_bytes()); + buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes()); + buf[12..20].copy_from_slice(&self.size.to_le_bytes()); + if include_offset { + buf[20..28].copy_from_slice(&self.offset.to_le_bytes()); + } + } +} + +struct FileEntry { + filename: OsString, + mtime: i64, + mode: u16, + zip64: Zip64Field, + crc32: [u8; 4], +} + +impl FileEntry { + fn new>(path: P, mtime: i64, mode: u16) -> Self { + let mut relpath = PathBuf::new(); + + for comp in path.as_ref().components() { + match comp { + Component::Normal(_) => { + relpath.push(comp); + }, + _ => {}, + } + } + + Self { + filename: relpath.into(), + crc32: [0,0,0,0], + mtime, + mode, + zip64: Zip64Field { + size: 0, + compressed_size: 0, + offset: 0, + }, + } + } + + fn local_file_header(&self, buf: &mut [u8]) -> usize { + let filename = self.filename.as_bytes(); + let filename_len: u16 = filename.len() as u16; + let size: usize = 30 + (filename_len as usize) + 20; + + if size > buf.len() { return 0; } + + buf[0..4].copy_from_slice(LOCAL_FH_SIG); + buf[4..6].copy_from_slice(VERSION_NEEDED); + buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression + let (date, time) = epoch_to_dos(self.mtime); + + buf[10..12].copy_from_slice(&time.to_le_bytes()); // time + buf[12..14].copy_from_slice(&date.to_le_bytes()); // date + buf[14..26].copy_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // crc32 + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size + ]); + buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len + + buf[28..30].copy_from_slice(&[20, 00]); // extra field len + + buf[30..(30 + filename_len) as usize].copy_from_slice(filename); + + self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false); + + size + } + + fn local_file_footer(&self, buf: &mut [u8]) -> usize { + let size = 24; + if size > buf.len() { return 0; } + + buf[0..4].copy_from_slice(LOCAL_FF_SIG); + buf[4..8].copy_from_slice(&self.crc32); + buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes()); + buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes()); + + size + } + + fn central_directory_file_header(&self, buf: &mut [u8]) -> usize { + let filename = self.filename.as_bytes(); + let filename_len: u16 = filename.len() as u16; + let size = 46 + 28 + (filename_len as usize); + + if size > buf.len() { return 0; } + + buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG); + buf[4..6].copy_from_slice(VERSION_MADE_BY); + buf[6..8].copy_from_slice(VERSION_NEEDED); + + buf[8..12].copy_from_slice(&[ + 0x08, 0x00, // general purpose flags + 0x00, 0x00, // compression + ]); + + let (date, time) = epoch_to_dos(self.mtime); + + buf[12..14].copy_from_slice(&time.to_le_bytes()); + buf[14..16].copy_from_slice(&date.to_le_bytes()); + buf[16..20].copy_from_slice(&self.crc32); + + buf[20..28].copy_from_slice(&[ + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size + ]); + + buf[28..30].copy_from_slice(&filename_len.to_le_bytes()); + buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len + + buf[32..38].copy_from_slice(&[ + 0x00, 0x00, // comment len + 0x00, 0x00, // start disk + 0x00, 0x00, // internal flags + ]); + + buf[38..40].copy_from_slice(&[ + 0x00, 0x00, // upper part of external flags + ]); + + buf[40..42].copy_from_slice(&self.mode.to_le_bytes()); + buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset + + buf[46..46+filename_len as usize].copy_from_slice(&filename); + self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true); + + size + } +} + +enum ZipStreamPos { + // index of file and if the header is already finished + FileHeader(usize), + File(usize), + CentralIndex, + End, +} + +/// Shorthand for the necessary metadata for a file inside a ZIP +pub type File = (PathBuf, i64, u16, R); + +/// Represents a ZIP file that can be streamed +/// +/// This will create a ZIP file on the fly by getting File entries from the +/// given stream, and finishes it when the stream is done +/// Example: +/// ```no_run +/// use proxmox_backup::tools::zip; +/// +/// #[tokio::async] +/// async fn main() { +/// let (sender, receiver) = tokio::sync::mpsc::channel(); +/// let zip = zip::ZipEncoderStream::new(receiver); +/// +/// tokio::spawn(async move { +/// sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await; +/// }); +/// +/// zip.next().await; // until done +/// +/// } +/// ``` +pub struct ZipEncoderStream +where + R: AsyncRead + Unpin, + S: Stream> + Unpin, +{ + buf: ByteBuffer, + bytes: usize, + central_dir_offset: usize, + central_dir_size: usize, + cur_reader: Option, + entrycount: usize, + files: VecDeque, + filestream: Option, + hasher: Option, + pos: ZipStreamPos, +} + +impl> + Unpin> ZipEncoderStream { + pub fn new(stream: S) -> Self { + Self { + buf: ByteBuffer::with_capacity(4*1024*1024), + bytes: 0, + central_dir_offset: 0, + central_dir_size: 0, + cur_reader: None, + entrycount: 0, + files: VecDeque::new(), + filestream: Some(stream), + hasher: None, + pos: ZipStreamPos::FileHeader(0), + } + } + + fn eocd(&mut self) -> usize { + let size = if self.central_dir_size > u32::MAX as usize + || self.central_dir_offset > u32::MAX as usize + || self.entrycount > u16::MAX as usize + { + 56+20+22 + } else { + 22 + }; + + + if self.buf.free_size() < size { return 0; } + + let eocd_start = size - 22; + let buf = self.buf.get_free_mut_slice(); + + let mut count = self.entrycount as u16; + let mut dir_size = self.central_dir_size as u32; + let mut offset = self.central_dir_offset as u32; + + if size > 22 { + count = 0xFFFF; + dir_size = 0xFFFFFFFF; + offset = 0xFFFFFFFF; + + buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD); + buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size + buf[12..14].copy_from_slice(VERSION_MADE_BY); + buf[14..16].copy_from_slice(VERSION_NEEDED); + buf[16..24].copy_from_slice(&[ + 0x00, 0x00, 0x00, 0x00, // number of disk + 0x00, 0x00, 0x00, 0x00, // number of disk of central directory + ]); + buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk + buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total + buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes()); + buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes()); + + let locator_offset = self.central_dir_offset + self.central_dir_size; + buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR); + buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number + buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes()); + buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1) + } + + buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR); + buf[eocd_start+4..eocd_start+8].copy_from_slice(&[ + 0x00, 0x00, // disk number + 0x00, 0x00, // start disk + ]); + + buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes()); + buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes()); + + buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count + buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count + buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len + + size + } +} + +impl> + Unpin> Stream for ZipEncoderStream { + type Item = Result, std::io::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + + loop { + match this.pos { + ZipStreamPos::FileHeader(idx) => { + if this.files.is_empty() || idx > this.files.len() - 1 { + if let Some(mut stream) = this.filestream.take() { + match Pin::new(&mut stream).poll_next(cx) { + Poll::Ready(Some((path, mtime, mode, file))) => { + this.filestream = Some(stream); + let entry = FileEntry::new(path, mtime, mode); + this.files.push_back(entry); + this.cur_reader = Some(file); + continue; + } + Poll::Pending => { + this.filestream = Some(stream); + if this.buf.is_empty() { + return Poll::Pending; + } + break; + } + Poll::Ready(None) => {}, + } + } + this.pos = ZipStreamPos::CentralIndex; + this.central_dir_offset = this.bytes; + this.entrycount = this.files.len(); + continue; + } + + let mut entry = &mut this.files[idx]; + entry.zip64.offset = this.bytes as u64; + let size = entry.local_file_header(this.buf.get_free_mut_slice()); + this.bytes += size; + this.buf.add_size(size as usize); + + if size == 0 { + break; + } + + if this.cur_reader.is_some() { + this.pos = ZipStreamPos::File(idx); + } else { + this.pos = ZipStreamPos::FileHeader(idx + 1); + } + + if this.buf.is_full() { + break; + } + }, + ZipStreamPos::File(idx) => { + let mut entry = &mut this.files[idx]; + let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?; + match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) { + Poll::Ready(Ok(n)) => { + let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new); + this.buf.add_size(n); + if n == 0 { + entry.crc32 = hasher.finalize().to_le_bytes(); + let size = entry.local_file_footer(this.buf.get_free_mut_slice()); + this.buf.add_size(size); + this.bytes += size; + + if size == 0 { + break; + } + + this.pos = ZipStreamPos::FileHeader(idx + 1); + + if this.buf.is_full() { + break; + } + + continue; + } + + this.bytes += n; + entry.zip64.size += n as u64; + entry.zip64.compressed_size += n as u64; + + hasher.update(&this.buf[this.buf.len() - n..]); + this.hasher = Some(hasher); + this.cur_reader = Some(reader); + + if this.buf.is_full() { + break; + } + } + Poll::Pending => { + this.cur_reader = Some(reader); + if this.buf.is_empty() { + return Poll::Pending; + } + break; + } + Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))), + } + }, + ZipStreamPos::CentralIndex => { + let mut finished_central_directory = true; + while this.files.len() > 0 { + let file = this.files.pop_front().unwrap(); + let size = file.central_directory_file_header(this.buf.get_free_mut_slice()); + this.buf.add_size(size); + if size == 0 { + this.files.push_front(file); + finished_central_directory = false; + break; + } + + this.bytes += size; + this.central_dir_size += size; + + if this.buf.is_full() { + finished_central_directory = false; + break; + } + } + + if !finished_central_directory { + break; + } + + let size = this.eocd(); + this.buf.add_size(size); + if size == 0 { + break; + } + + this.pos = ZipStreamPos::End; + break; + } + ZipStreamPos::End => return Poll::Ready(None) + } + } + + return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec()))); + } +} -- 2.20.1