From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8F4056157D for ; Tue, 13 Oct 2020 13:03:25 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 62794E4C5 for ; Tue, 13 Oct 2020 13:02:55 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [212.186.127.180]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS id 65C5BE4BA for ; Tue, 13 Oct 2020 13:02:53 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 2D1D045D26 for ; Tue, 13 Oct 2020 13:02:53 +0200 (CEST) Date: Tue, 13 Oct 2020 13:02:51 +0200 From: Wolfgang Bumiller To: Dominik Csapak Cc: pbs-devel@lists.proxmox.com Message-ID: <20201013110251.kbcbuqeqptb3kimr@wobu-vie.proxmox.com> References: <20201013095042.31337-1-d.csapak@proxmox.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20201013095042.31337-1-d.csapak@proxmox.com> User-Agent: NeoMutt/20180716 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.017 Adjusted score from AWL reputation of From: address KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_DNSWL_MED -2.3 Sender listed at https://www.dnswl.org/, medium trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [zip.rs, tools.rs] Subject: Re: [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 13 Oct 2020 11:03:25 -0000 partial review only for now: On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote: > this is a module to to stream a zip file in an api call > the user has to give the Encoder a stream of files (Path, mtime, mode, Reader) > the resulting object itself implements stream for easy use in a > hyper::Body > > for now, this does not implement compression (uses ZIPs STORE mode), and > does not support empty directories or hardlinks (or any other special > files) > > Signed-off-by: Dominik Csapak > --- > src/tools.rs | 1 + > src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 469 insertions(+) > create mode 100644 src/tools/zip.rs > > diff --git a/src/tools.rs b/src/tools.rs > index 5b244c02..28f0338b 100644 > --- a/src/tools.rs > +++ b/src/tools.rs > @@ -35,6 +35,7 @@ pub mod nom; > pub mod logrotate; > pub mod loopdev; > pub mod fuse_loop; > +pub mod zip; > > mod parallel_handler; > pub use parallel_handler::*; > diff --git a/src/tools/zip.rs b/src/tools/zip.rs > new file mode 100644 > index 00000000..acb193cb > --- /dev/null > +++ b/src/tools/zip.rs > @@ -0,0 +1,468 @@ > +use std::ffi::OsString; > +use std::collections::VecDeque; > +use std::os::unix::ffi::OsStrExt; > +use std::pin::Pin; > +use std::path::{Component, Path, PathBuf}; > + > +use futures::task::{Context, Poll}; > +use tokio::io::AsyncRead; > +use tokio::stream::Stream; > +use anyhow::Result; > + > +use proxmox::io_format_err; > +use proxmox::tools::{ > + byte_buffer::ByteBuffer, > + time::gmtime, > +}; > +use crc32fast::Hasher; > + > +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04]; > +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08]; > +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02]; > +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06]; > +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D) > +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03) > + > +const ZIP64_TYPE: &[u8] = &[0x01, 0x00]; > +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06]; > +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07]; > + > +fn epoch_to_dos(epoch: i64) -> (u16, u16) { > + let gmtime = match gmtime(epoch) { > + Ok(gmtime) => gmtime, > + Err(_) => return (0,0), > + }; > + > + let seconds = gmtime.tm_sec/2 & 0b11111; Instead of stripping spaces, please add parenthesis for grouping (clippy hint). Also, for binary literals please use underscores for readability (group them in nibbles, that way a group always represents a hex digit): let seconds = (gmtime.tm_sec / 2) & 0b1_1111; same for all the ones below > + let minutes = gmtime.tm_min & 0xb111111; > + let hours = gmtime.tm_hour & 0b11111; > + let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16; > + > + let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) { > + 0 > + } else { > + let day = gmtime.tm_mday & 0b11111; > + let month = (gmtime.tm_mon + 1) & 0b1111; > + let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111; > + ((year<<9)|(month<<5)|(day)) as u16 > + }; > + > + (date, time) > +} > + > +struct Zip64Field { > + size: u64, > + compressed_size: u64, > + offset: u64, > +} > + > +impl Zip64Field { > + fn to_bytes(&self, buf: &mut [u8], include_offset: bool) { > + let size :u16 = if include_offset { 24 } else { 16 }; > + buf[0..2].copy_from_slice(ZIP64_TYPE); > + buf[2..4].copy_from_slice(&size.to_le_bytes()); > + buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes()); > + buf[12..20].copy_from_slice(&self.size.to_le_bytes()); > + if include_offset { > + buf[20..28].copy_from_slice(&self.offset.to_le_bytes()); > + } > + } > +} > + > +struct FileEntry { > + filename: OsString, > + mtime: i64, > + mode: u16, > + zip64: Zip64Field, > + crc32: [u8; 4], > +} > + > +impl FileEntry { > + fn new>(path: P, mtime: i64, mode: u16) -> Self { > + let mut relpath = PathBuf::new(); > + > + for comp in path.as_ref().components() { > + match comp { > + Component::Normal(_) => { > + relpath.push(comp); > + }, > + _ => {}, > + } clipy hint: `if let` would be shorter here: if let Component::Normal(_) = comp { relpath.push(comp); } > + } > + > + Self { > + filename: relpath.into(), > + crc32: [0,0,0,0], > + mtime, > + mode, > + zip64: Zip64Field { > + size: 0, > + compressed_size: 0, > + offset: 0, > + }, > + } > + } > + > + fn local_file_header(&self, buf: &mut [u8]) -> usize { method name & signature doesn't really make the purpose of this function clear > + let filename = self.filename.as_bytes(); > + let filename_len: u16 = filename.len() as u16; > + let size: usize = 30 + (filename_len as usize) + 20; > + > + if size > buf.len() { return 0; } > + > + buf[0..4].copy_from_slice(LOCAL_FH_SIG); > + buf[4..6].copy_from_slice(VERSION_NEEDED); > + buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression > + let (date, time) = epoch_to_dos(self.mtime); > + > + buf[10..12].copy_from_slice(&time.to_le_bytes()); // time > + buf[12..14].copy_from_slice(&date.to_le_bytes()); // date > + buf[14..26].copy_from_slice(&[ > + 0x00, 0x00, 0x00, 0x00, // crc32 > + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size > + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size > + ]); > + buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len > + > + buf[28..30].copy_from_slice(&[20, 00]); // extra field len ^ Is there a reason you chose to do it this way than, say, define a #[derive(Endian)] #[repr(C)] struct LocalFileHeader { } you could do this for all structs and have a helper like: fn write_struct(output: &mut T, data: E) -> io::Result<()> where T: Write + ?Sized, E: Endian, { let data = data.to_le(); output.write_all(unsafe { std::slice::from_raw_parts( &data as *const E as *const u8, size_of_val(&data), ) }) } and use `write_struct(&mut buffer, LocalFileHeader { ... })?` > + > + buf[30..(30 + filename_len) as usize].copy_from_slice(filename); > + > + self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false); > + > + size > + } > + > + fn local_file_footer(&self, buf: &mut [u8]) -> usize { > + let size = 24; > + if size > buf.len() { return 0; } > + > + buf[0..4].copy_from_slice(LOCAL_FF_SIG); > + buf[4..8].copy_from_slice(&self.crc32); > + buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes()); > + buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes()); > + > + size > + } > + > + fn central_directory_file_header(&self, buf: &mut [u8]) -> usize { > + let filename = self.filename.as_bytes(); > + let filename_len: u16 = filename.len() as u16; > + let size = 46 + 28 + (filename_len as usize); > + > + if size > buf.len() { return 0; } > + > + buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG); > + buf[4..6].copy_from_slice(VERSION_MADE_BY); > + buf[6..8].copy_from_slice(VERSION_NEEDED); > + > + buf[8..12].copy_from_slice(&[ > + 0x08, 0x00, // general purpose flags > + 0x00, 0x00, // compression > + ]); > + > + let (date, time) = epoch_to_dos(self.mtime); > + > + buf[12..14].copy_from_slice(&time.to_le_bytes()); > + buf[14..16].copy_from_slice(&date.to_le_bytes()); > + buf[16..20].copy_from_slice(&self.crc32); > + > + buf[20..28].copy_from_slice(&[ > + 0xFF, 0xFF, 0xFF, 0xFF, // compressed size > + 0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size > + ]); > + > + buf[28..30].copy_from_slice(&filename_len.to_le_bytes()); > + buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len > + > + buf[32..38].copy_from_slice(&[ > + 0x00, 0x00, // comment len > + 0x00, 0x00, // start disk > + 0x00, 0x00, // internal flags > + ]); > + > + buf[38..40].copy_from_slice(&[ > + 0x00, 0x00, // upper part of external flags > + ]); > + > + buf[40..42].copy_from_slice(&self.mode.to_le_bytes()); > + buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset > + > + buf[46..46+filename_len as usize].copy_from_slice(&filename); > + self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true); > + > + size > + } > +} > + > +enum ZipStreamPos { > + // index of file and if the header is already finished > + FileHeader(usize), > + File(usize), > + CentralIndex, > + End, > +} > + > +/// Shorthand for the necessary metadata for a file inside a ZIP > +pub type File = (PathBuf, i64, u16, R); > + > +/// Represents a ZIP file that can be streamed > +/// > +/// This will create a ZIP file on the fly by getting File entries from the > +/// given stream, and finishes it when the stream is done > +/// Example: > +/// ```no_run > +/// use proxmox_backup::tools::zip; > +/// > +/// #[tokio::async] > +/// async fn main() { > +/// let (sender, receiver) = tokio::sync::mpsc::channel(); > +/// let zip = zip::ZipEncoderStream::new(receiver); > +/// > +/// tokio::spawn(async move { > +/// sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await; > +/// }); > +/// > +/// zip.next().await; // until done > +/// > +/// } > +/// ``` > +pub struct ZipEncoderStream > +where > + R: AsyncRead + Unpin, > + S: Stream> + Unpin, > +{ > + buf: ByteBuffer, > + bytes: usize, > + central_dir_offset: usize, > + central_dir_size: usize, > + cur_reader: Option, > + entrycount: usize, > + files: VecDeque, > + filestream: Option, > + hasher: Option, > + pos: ZipStreamPos, > +} > + > +impl> + Unpin> ZipEncoderStream { > + pub fn new(stream: S) -> Self { > + Self { > + buf: ByteBuffer::with_capacity(4*1024*1024), > + bytes: 0, > + central_dir_offset: 0, > + central_dir_size: 0, > + cur_reader: None, > + entrycount: 0, > + files: VecDeque::new(), > + filestream: Some(stream), > + hasher: None, > + pos: ZipStreamPos::FileHeader(0), > + } > + } > + > + fn eocd(&mut self) -> usize { > + let size = if self.central_dir_size > u32::MAX as usize > + || self.central_dir_offset > u32::MAX as usize > + || self.entrycount > u16::MAX as usize > + { > + 56+20+22 > + } else { > + 22 > + }; > + > + > + if self.buf.free_size() < size { return 0; } > + > + let eocd_start = size - 22; > + let buf = self.buf.get_free_mut_slice(); > + > + let mut count = self.entrycount as u16; > + let mut dir_size = self.central_dir_size as u32; > + let mut offset = self.central_dir_offset as u32; > + > + if size > 22 { > + count = 0xFFFF; > + dir_size = 0xFFFFFFFF; > + offset = 0xFFFFFFFF; > + > + buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD); > + buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size > + buf[12..14].copy_from_slice(VERSION_MADE_BY); > + buf[14..16].copy_from_slice(VERSION_NEEDED); > + buf[16..24].copy_from_slice(&[ > + 0x00, 0x00, 0x00, 0x00, // number of disk > + 0x00, 0x00, 0x00, 0x00, // number of disk of central directory > + ]); > + buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk > + buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total > + buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes()); > + buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes()); > + > + let locator_offset = self.central_dir_offset + self.central_dir_size; > + buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR); > + buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number > + buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes()); > + buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1) > + } > + > + buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR); > + buf[eocd_start+4..eocd_start+8].copy_from_slice(&[ > + 0x00, 0x00, // disk number > + 0x00, 0x00, // start disk > + ]); > + > + buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes()); > + buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes()); > + > + buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count > + buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count > + buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len > + > + size > + } > +} > + > +impl> + Unpin> Stream for ZipEncoderStream { > + type Item = Result, std::io::Error>; > + > + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { > + let this = self.get_mut(); > + > + loop { > + match this.pos { > + ZipStreamPos::FileHeader(idx) => { > + if this.files.is_empty() || idx > this.files.len() - 1 { > + if let Some(mut stream) = this.filestream.take() { > + match Pin::new(&mut stream).poll_next(cx) { > + Poll::Ready(Some((path, mtime, mode, file))) => { > + this.filestream = Some(stream); > + let entry = FileEntry::new(path, mtime, mode); > + this.files.push_back(entry); > + this.cur_reader = Some(file); > + continue; > + } > + Poll::Pending => { > + this.filestream = Some(stream); > + if this.buf.is_empty() { > + return Poll::Pending; > + } > + break; > + } > + Poll::Ready(None) => {}, > + } > + } > + this.pos = ZipStreamPos::CentralIndex; > + this.central_dir_offset = this.bytes; > + this.entrycount = this.files.len(); > + continue; > + } > + > + let mut entry = &mut this.files[idx]; > + entry.zip64.offset = this.bytes as u64; > + let size = entry.local_file_header(this.buf.get_free_mut_slice()); > + this.bytes += size; > + this.buf.add_size(size as usize); > + > + if size == 0 { > + break; > + } > + > + if this.cur_reader.is_some() { > + this.pos = ZipStreamPos::File(idx); > + } else { > + this.pos = ZipStreamPos::FileHeader(idx + 1); > + } > + > + if this.buf.is_full() { > + break; > + } > + }, > + ZipStreamPos::File(idx) => { > + let mut entry = &mut this.files[idx]; > + let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?; > + match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) { > + Poll::Ready(Ok(n)) => { > + let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new); > + this.buf.add_size(n); > + if n == 0 { > + entry.crc32 = hasher.finalize().to_le_bytes(); > + let size = entry.local_file_footer(this.buf.get_free_mut_slice()); > + this.buf.add_size(size); > + this.bytes += size; > + > + if size == 0 { > + break; > + } > + > + this.pos = ZipStreamPos::FileHeader(idx + 1); > + > + if this.buf.is_full() { > + break; > + } > + > + continue; > + } > + > + this.bytes += n; > + entry.zip64.size += n as u64; > + entry.zip64.compressed_size += n as u64; > + > + hasher.update(&this.buf[this.buf.len() - n..]); > + this.hasher = Some(hasher); > + this.cur_reader = Some(reader); > + > + if this.buf.is_full() { > + break; > + } > + } > + Poll::Pending => { > + this.cur_reader = Some(reader); > + if this.buf.is_empty() { > + return Poll::Pending; > + } > + break; > + } > + Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))), > + } > + }, > + ZipStreamPos::CentralIndex => { > + let mut finished_central_directory = true; > + while this.files.len() > 0 { > + let file = this.files.pop_front().unwrap(); > + let size = file.central_directory_file_header(this.buf.get_free_mut_slice()); > + this.buf.add_size(size); > + if size == 0 { > + this.files.push_front(file); > + finished_central_directory = false; > + break; > + } > + > + this.bytes += size; > + this.central_dir_size += size; > + > + if this.buf.is_full() { > + finished_central_directory = false; > + break; > + } > + } > + > + if !finished_central_directory { > + break; > + } > + > + let size = this.eocd(); > + this.buf.add_size(size); > + if size == 0 { > + break; > + } > + > + this.pos = ZipStreamPos::End; > + break; > + } > + ZipStreamPos::End => return Poll::Ready(None) > + } > + } > + > + return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec()))); > + } > +} > -- > 2.20.1