public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
@ 2020-10-13  9:50 Dominik Csapak
  2020-10-13  9:50 ` [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip Dominik Csapak
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Dominik Csapak @ 2020-10-13  9:50 UTC (permalink / raw)
  To: pbs-devel

this is a module to to stream a zip file in an api call
the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
the resulting object itself implements stream for easy use in a
hyper::Body

for now, this does not implement compression (uses ZIPs STORE mode), and
does not support empty directories or hardlinks (or any other special
files)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools.rs     |   1 +
 src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 469 insertions(+)
 create mode 100644 src/tools/zip.rs

diff --git a/src/tools.rs b/src/tools.rs
index 5b244c02..28f0338b 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -35,6 +35,7 @@ pub mod nom;
 pub mod logrotate;
 pub mod loopdev;
 pub mod fuse_loop;
+pub mod zip;
 
 mod parallel_handler;
 pub use parallel_handler::*;
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
new file mode 100644
index 00000000..acb193cb
--- /dev/null
+++ b/src/tools/zip.rs
@@ -0,0 +1,468 @@
+use std::ffi::OsString;
+use std::collections::VecDeque;
+use std::os::unix::ffi::OsStrExt;
+use std::pin::Pin;
+use std::path::{Component, Path, PathBuf};
+
+use futures::task::{Context, Poll};
+use tokio::io::AsyncRead;
+use tokio::stream::Stream;
+use anyhow::Result;
+
+use proxmox::io_format_err;
+use proxmox::tools::{
+    byte_buffer::ByteBuffer,
+    time::gmtime,
+};
+use crc32fast::Hasher;
+
+const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
+const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
+const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
+const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
+const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
+const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
+
+const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
+const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
+const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
+
+fn epoch_to_dos(epoch: i64) -> (u16, u16) {
+    let gmtime = match gmtime(epoch) {
+        Ok(gmtime) => gmtime,
+        Err(_) => return (0,0),
+    };
+
+    let seconds = gmtime.tm_sec/2 & 0b11111;
+    let minutes = gmtime.tm_min & 0xb111111;
+    let hours = gmtime.tm_hour & 0b11111;
+    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
+
+    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
+        0
+    } else {
+        let day = gmtime.tm_mday & 0b11111;
+        let month = (gmtime.tm_mon + 1) & 0b1111;
+        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
+        ((year<<9)|(month<<5)|(day)) as u16
+    };
+
+    (date, time)
+}
+
+struct Zip64Field {
+    size: u64,
+    compressed_size: u64,
+    offset: u64,
+}
+
+impl Zip64Field {
+    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
+        let size :u16 = if include_offset { 24 }  else { 16 };
+        buf[0..2].copy_from_slice(ZIP64_TYPE);
+        buf[2..4].copy_from_slice(&size.to_le_bytes());
+        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
+        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
+        if include_offset {
+            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
+        }
+    }
+}
+
+struct FileEntry {
+    filename: OsString,
+    mtime: i64,
+    mode: u16,
+    zip64: Zip64Field,
+    crc32: [u8; 4],
+}
+
+impl FileEntry {
+    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
+        let mut relpath = PathBuf::new();
+
+        for comp in path.as_ref().components() {
+            match comp {
+                Component::Normal(_) => {
+                    relpath.push(comp);
+                },
+                _ => {},
+            }
+        }
+
+        Self {
+            filename: relpath.into(),
+            crc32: [0,0,0,0],
+            mtime,
+            mode,
+            zip64: Zip64Field {
+                size: 0,
+                compressed_size: 0,
+                offset: 0,
+            },
+        }
+    }
+
+    fn local_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size: usize = 30 + (filename_len as usize) + 20;
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_NEEDED);
+        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
+        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
+        buf[14..26].copy_from_slice(&[
+            0x00, 0x00, 0x00, 0x00, // crc32
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
+
+        buf[28..30].copy_from_slice(&[20, 00]); // extra field len
+
+        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
+
+        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
+
+        size
+    }
+
+    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
+        let size = 24;
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
+        buf[4..8].copy_from_slice(&self.crc32);
+        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
+        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
+
+        size
+    }
+
+    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size = 46 + 28 + (filename_len as usize);
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_MADE_BY);
+        buf[6..8].copy_from_slice(VERSION_NEEDED);
+
+        buf[8..12].copy_from_slice(&[
+            0x08, 0x00, // general purpose flags
+            0x00, 0x00, // compression
+        ]);
+
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[12..14].copy_from_slice(&time.to_le_bytes());
+        buf[14..16].copy_from_slice(&date.to_le_bytes());
+        buf[16..20].copy_from_slice(&self.crc32);
+
+        buf[20..28].copy_from_slice(&[
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+
+        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
+        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
+
+        buf[32..38].copy_from_slice(&[
+            0x00, 0x00, // comment len
+            0x00, 0x00, // start disk
+            0x00, 0x00, // internal flags
+        ]);
+
+        buf[38..40].copy_from_slice(&[
+            0x00, 0x00, // upper part of external flags
+        ]);
+
+        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
+        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
+
+        buf[46..46+filename_len as usize].copy_from_slice(&filename);
+        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
+
+        size
+    }
+}
+
+enum ZipStreamPos {
+    // index of file and if the header is already finished
+    FileHeader(usize),
+    File(usize),
+    CentralIndex,
+    End,
+}
+
+/// Shorthand for the necessary metadata for a file inside a ZIP
+pub type File<R> = (PathBuf, i64, u16, R);
+
+/// Represents a ZIP file that can be streamed
+///
+/// This will create a ZIP file on the fly by getting File entries from the
+/// given stream, and finishes it when the stream is done
+/// Example:
+/// ```no_run
+/// use proxmox_backup::tools::zip;
+///
+/// #[tokio::async]
+/// async fn main() {
+///     let (sender, receiver) = tokio::sync::mpsc::channel();
+///     let zip = zip::ZipEncoderStream::new(receiver);
+///
+///     tokio::spawn(async move {
+///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
+///     });
+///
+///     zip.next().await; // until done
+///
+/// }
+/// ```
+pub struct ZipEncoderStream<R, S>
+where
+    R: AsyncRead + Unpin,
+    S: Stream<Item = File<R>> + Unpin,
+{
+    buf: ByteBuffer,
+    bytes: usize,
+    central_dir_offset: usize,
+    central_dir_size: usize,
+    cur_reader: Option<R>,
+    entrycount: usize,
+    files: VecDeque<FileEntry>,
+    filestream: Option<S>,
+    hasher: Option<Hasher>,
+    pos: ZipStreamPos,
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
+    pub fn new(stream: S) -> Self {
+        Self {
+            buf: ByteBuffer::with_capacity(4*1024*1024),
+            bytes: 0,
+            central_dir_offset: 0,
+            central_dir_size: 0,
+            cur_reader: None,
+            entrycount: 0,
+            files: VecDeque::new(),
+            filestream: Some(stream),
+            hasher: None,
+            pos: ZipStreamPos::FileHeader(0),
+        }
+    }
+
+    fn eocd(&mut self) -> usize {
+        let size = if self.central_dir_size > u32::MAX as usize
+            || self.central_dir_offset > u32::MAX as  usize
+            || self.entrycount > u16::MAX as usize
+        {
+            56+20+22
+        } else {
+            22
+        };
+
+
+        if self.buf.free_size() < size { return 0; }
+
+        let eocd_start = size - 22;
+        let buf = self.buf.get_free_mut_slice();
+
+        let mut count = self.entrycount as u16;
+        let mut dir_size = self.central_dir_size as u32;
+        let mut offset = self.central_dir_offset as u32;
+
+        if size > 22 {
+            count = 0xFFFF;
+            dir_size = 0xFFFFFFFF;
+            offset = 0xFFFFFFFF;
+
+            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
+            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
+            buf[12..14].copy_from_slice(VERSION_MADE_BY);
+            buf[14..16].copy_from_slice(VERSION_NEEDED);
+            buf[16..24].copy_from_slice(&[
+                0x00, 0x00, 0x00, 0x00, // number of disk
+                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
+            ]);
+            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
+            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
+            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
+            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
+
+            let locator_offset = self.central_dir_offset + self.central_dir_size;
+            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
+            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
+            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
+            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
+        }
+
+        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
+        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
+            0x00, 0x00, // disk number
+            0x00, 0x00, // start disk
+        ]);
+
+        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
+        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
+
+        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
+        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
+        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
+
+        size
+    }
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
+    type Item = Result<Vec<u8>, std::io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.pos {
+                ZipStreamPos::FileHeader(idx) => {
+                    if this.files.is_empty() || idx > this.files.len() - 1 {
+                        if let Some(mut stream) = this.filestream.take() {
+                            match Pin::new(&mut stream).poll_next(cx) {
+                                Poll::Ready(Some((path, mtime, mode, file))) => {
+                                    this.filestream = Some(stream);
+                                    let entry = FileEntry::new(path, mtime, mode);
+                                    this.files.push_back(entry);
+                                    this.cur_reader = Some(file);
+                                    continue;
+                                }
+                                Poll::Pending => {
+                                    this.filestream = Some(stream);
+                                    if this.buf.is_empty() {
+                                        return Poll::Pending;
+                                    }
+                                    break;
+                                }
+                                Poll::Ready(None) => {},
+                            }
+                        }
+                        this.pos = ZipStreamPos::CentralIndex;
+                        this.central_dir_offset = this.bytes;
+                        this.entrycount = this.files.len();
+                        continue;
+                    }
+
+                    let mut entry = &mut this.files[idx];
+                    entry.zip64.offset = this.bytes as u64;
+                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
+                    this.bytes += size;
+                    this.buf.add_size(size as usize);
+
+                    if size == 0 {
+                        break;
+                    }
+
+                    if this.cur_reader.is_some() {
+                        this.pos = ZipStreamPos::File(idx);
+                    } else {
+                        this.pos = ZipStreamPos::FileHeader(idx + 1);
+                    }
+
+                    if this.buf.is_full() {
+                        break;
+                    }
+                },
+                ZipStreamPos::File(idx) => {
+                    let mut entry = &mut this.files[idx];
+                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
+                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
+                        Poll::Ready(Ok(n)) => {
+                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
+                            this.buf.add_size(n);
+                            if n == 0 {
+                                entry.crc32 = hasher.finalize().to_le_bytes();
+                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
+                                this.buf.add_size(size);
+                                this.bytes += size;
+
+                                if size == 0 {
+                                    break;
+                                }
+
+                                this.pos = ZipStreamPos::FileHeader(idx + 1);
+
+                                if this.buf.is_full() {
+                                    break;
+                                }
+
+                                continue;
+                            }
+
+                            this.bytes += n;
+                            entry.zip64.size += n as u64;
+                            entry.zip64.compressed_size += n as u64;
+
+                            hasher.update(&this.buf[this.buf.len() - n..]);
+                            this.hasher = Some(hasher);
+                            this.cur_reader = Some(reader);
+
+                            if this.buf.is_full() {
+                                break;
+                            }
+                        }
+                        Poll::Pending => {
+                            this.cur_reader = Some(reader);
+                            if this.buf.is_empty() {
+                                return Poll::Pending;
+                            }
+                            break;
+                        }
+                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
+                    }
+                },
+                ZipStreamPos::CentralIndex => {
+                    let mut finished_central_directory = true;
+                    while this.files.len() > 0 {
+                        let file = this.files.pop_front().unwrap();
+                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
+                        this.buf.add_size(size);
+                        if size == 0 {
+                            this.files.push_front(file);
+                            finished_central_directory = false;
+                            break;
+                        }
+
+                        this.bytes += size;
+                        this.central_dir_size += size;
+
+                        if this.buf.is_full() {
+                            finished_central_directory = false;
+                            break;
+                        }
+                    }
+
+                    if !finished_central_directory {
+                        break;
+                    }
+
+                    let size = this.eocd();
+                    this.buf.add_size(size);
+                    if size == 0 {
+                        break;
+                    }
+
+                    this.pos = ZipStreamPos::End;
+                    break;
+                }
+                ZipStreamPos::End => return Poll::Ready(None)
+            }
+        }
+
+        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
+    }
+}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip
  2020-10-13  9:50 [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Dominik Csapak
@ 2020-10-13  9:50 ` Dominik Csapak
  2020-10-13 11:02 ` [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Wolfgang Bumiller
  2020-10-13 15:04 ` Wolfgang Bumiller
  2 siblings, 0 replies; 4+ messages in thread
From: Dominik Csapak @ 2020-10-13  9:50 UTC (permalink / raw)
  To: pbs-devel

by using the new ZipEncoderStream and recursively add files to it
the zip only contains normal files and hardlinks (by simply copying the
content), no empty directories/symlinks/etc

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/api2/admin/datastore.rs | 132 +++++++++++++++++++++++++++++++-----
 www/window/FileBrowser.js   |   8 +++
 2 files changed, 123 insertions(+), 17 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index c260b62d..3f22ffe7 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -2,12 +2,15 @@ use std::collections::{HashSet, HashMap};
 use std::ffi::OsStr;
 use std::os::unix::ffi::OsStrExt;
 use std::sync::{Arc, Mutex};
+use std::pin::Pin;
+use std::path::PathBuf;
 
 use anyhow::{bail, format_err, Error};
 use futures::*;
 use hyper::http::request::Parts;
 use hyper::{header, Body, Response, StatusCode};
 use serde_json::{json, Value};
+use tokio::sync::mpsc::Sender;
 
 use proxmox::api::{
     api, ApiResponseFuture, ApiHandler, ApiMethod, Router,
@@ -19,7 +22,7 @@ use proxmox::tools::fs::{replace_file, CreateOptions};
 use proxmox::try_block;
 use proxmox::{http_err, identity, list_subdirs_api_method, sortable};
 
-use pxar::accessor::aio::Accessor;
+use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
 use pxar::EntryKind;
 
 use crate::api2::types::*;
@@ -29,7 +32,15 @@ use crate::config::datastore;
 use crate::config::cached_user_info::CachedUserInfo;
 
 use crate::server::WorkerTask;
-use crate::tools::{self, AsyncReaderStream, WrappedReaderStream};
+use crate::tools::{
+    self,
+    AsyncReaderStream,
+    WrappedReaderStream,
+    zip::{
+        self,
+        ZipEncoderStream,
+    }
+};
 use crate::config::acl::{
     PRIV_DATASTORE_AUDIT,
     PRIV_DATASTORE_MODIFY,
@@ -1243,6 +1254,68 @@ fn catalog(
     Ok(res.into())
 }
 
+fn recurse_files<T: 'static>(
+    mut sender: Sender<zip::File<FileContents<T>>>,
+    mut decoder: Accessor<T>,
+    prefix: PathBuf,
+    file: FileEntry<T>,
+) -> Pin<
+    Box<
+        dyn Future<Output = Result<(Sender<zip::File<FileContents<T>>>, Accessor<T>), Error>>
+            + Send
+            + 'static,
+    >,
+>
+where
+    T: Clone + pxar::accessor::ReadAt + Unpin + Send + Sync,
+{
+    Box::pin(async move {
+        let metadata = file.entry().metadata();
+        let path = file.entry().path().strip_prefix(&prefix)?.to_path_buf();
+
+        match file.kind() {
+            EntryKind::File { .. } => {
+                let entry = (
+                    path,
+                    metadata.stat.mtime.secs,
+                    metadata.stat.mode as u16,
+                    file.contents().await?,
+                );
+                sender
+                    .send(entry)
+                    .await
+                    .map_err(|err| format_err!("could not send file entry: {}", err))?;
+            }
+            EntryKind::Hardlink(_) => {
+                let realfile = decoder.follow_hardlink(&file).await?;
+                let entry = (
+                    path,
+                    metadata.stat.mtime.secs,
+                    metadata.stat.mode as u16,
+                    realfile.contents().await?,
+                );
+                sender
+                    .send(entry)
+                    .await
+                    .map_err(|err| format_err!("could not send file entry: {}", err))?;
+            }
+            EntryKind::Directory => {
+                let dir = file.enter_directory().await?;
+                let mut readdir = dir.read_dir();
+                while let Some(entry) = readdir.next().await {
+                    let entry = entry?.decode_entry().await?;
+                    let (sender_tmp, decoder_tmp) = recurse_files(sender, decoder, prefix.clone(), entry).await?;
+                    sender = sender_tmp;
+                    decoder = decoder_tmp;
+                }
+            }
+            _ => {} // ignore all else
+        };
+
+        Ok((sender, decoder))
+    })
+}
+
 #[sortable]
 pub const API_METHOD_PXAR_FILE_DOWNLOAD: ApiMethod = ApiMethod::new(
     &ApiHandler::AsyncHttp(&pxar_file_download),
@@ -1325,22 +1398,47 @@ fn pxar_file_download(
             .lookup(OsStr::from_bytes(file_path)).await?
             .ok_or(format_err!("error opening '{:?}'", file_path))?;
 
-        let file = match file.kind() {
-            EntryKind::File { .. } => file,
-            EntryKind::Hardlink(_) => {
-                decoder.follow_hardlink(&file).await?
-            },
-            // TODO symlink
-            other => bail!("cannot download file of type {:?}", other),
-        };
+        let body = match file.kind() {
+            EntryKind::File { .. }  =>
+                Body::wrap_stream(
+                    AsyncReaderStream::new(file.contents().await?)
+                    .map_err(move |err| {
+                        eprintln!("error during streaming of file '{:?}' - {}", filepath, err);
+                        err
+                    })
+                ),
+            EntryKind::Hardlink(_) =>
+                Body::wrap_stream(
+                    AsyncReaderStream::new(decoder.follow_hardlink(&file).await?.contents().await?)
+                    .map_err(move |err| {
+                        eprintln!("error during streaming of hardlink '{:?}' - {}", filepath, err);
+                        err
+                    })
+                ),
+            EntryKind::Directory => {
+                let (sender, receiver) = tokio::sync::mpsc::channel(100);
+                let mut prefix = PathBuf::new();
+                let mut components = file.entry().path().components();
+                components.next_back();  // discar last
+                for comp in components {
+                    prefix.push(comp);
+                }
 
-        let body = Body::wrap_stream(
-            AsyncReaderStream::new(file.contents().await?)
-                .map_err(move |err| {
-                    eprintln!("error during streaming of '{:?}' - {}", filepath, err);
-                    err
-                })
-        );
+                crate::server::spawn_internal_task(async move {
+                    let _ = recurse_files(sender, decoder, prefix, file).await?;
+                    Ok::<(), Error>(())
+                });
+
+                Body::wrap_stream(
+                    ZipEncoderStream::new(receiver)
+                    .map_err(move |err| {
+                        eprintln!("error during streaming of zip '{:?}' - {}", filepath, err);
+                        err
+                    })
+                )
+            }
+            other => bail!("cannot download file of type {:?}", other)
+        };
 
         // fixme: set other headers ?
         Ok(Response::builder()
diff --git a/www/window/FileBrowser.js b/www/window/FileBrowser.js
index 2ac50e1a..01b5d79b 100644
--- a/www/window/FileBrowser.js
+++ b/www/window/FileBrowser.js
@@ -87,6 +87,9 @@ Ext.define("PBS.window.FileBrowser", {
 	    };
 	    params.filepath = data.filepath;
 	    atag.download = data.text;
+	    if (data.type === 'd') {
+		atag.download += ".zip";
+	    }
 	    atag.href = me
 	        .buildUrl(`/api2/json/admin/datastore/${view.datastore}/pxar-file-download`, params);
 	    atag.click();
@@ -106,6 +109,11 @@ Ext.define("PBS.window.FileBrowser", {
 		case 'f':
 		    canDownload = true;
 		    break;
+		case 'd':
+		    if (data.depth > 1) {
+			canDownload = true;
+		    }
+		    break;
 		default: break;
 	    }
 
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
  2020-10-13  9:50 [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Dominik Csapak
  2020-10-13  9:50 ` [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip Dominik Csapak
@ 2020-10-13 11:02 ` Wolfgang Bumiller
  2020-10-13 15:04 ` Wolfgang Bumiller
  2 siblings, 0 replies; 4+ messages in thread
From: Wolfgang Bumiller @ 2020-10-13 11:02 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

partial review only for now:

On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote:
> this is a module to to stream a zip file in an api call
> the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
> the resulting object itself implements stream for easy use in a
> hyper::Body
> 
> for now, this does not implement compression (uses ZIPs STORE mode), and
> does not support empty directories or hardlinks (or any other special
> files)
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  src/tools.rs     |   1 +
>  src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 469 insertions(+)
>  create mode 100644 src/tools/zip.rs
> 
> diff --git a/src/tools.rs b/src/tools.rs
> index 5b244c02..28f0338b 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -35,6 +35,7 @@ pub mod nom;
>  pub mod logrotate;
>  pub mod loopdev;
>  pub mod fuse_loop;
> +pub mod zip;
>  
>  mod parallel_handler;
>  pub use parallel_handler::*;
> diff --git a/src/tools/zip.rs b/src/tools/zip.rs
> new file mode 100644
> index 00000000..acb193cb
> --- /dev/null
> +++ b/src/tools/zip.rs
> @@ -0,0 +1,468 @@
> +use std::ffi::OsString;
> +use std::collections::VecDeque;
> +use std::os::unix::ffi::OsStrExt;
> +use std::pin::Pin;
> +use std::path::{Component, Path, PathBuf};
> +
> +use futures::task::{Context, Poll};
> +use tokio::io::AsyncRead;
> +use tokio::stream::Stream;
> +use anyhow::Result;
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::{
> +    byte_buffer::ByteBuffer,
> +    time::gmtime,
> +};
> +use crc32fast::Hasher;
> +
> +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
> +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
> +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
> +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
> +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
> +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
> +
> +const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
> +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
> +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
> +
> +fn epoch_to_dos(epoch: i64) -> (u16, u16) {
> +    let gmtime = match gmtime(epoch) {
> +        Ok(gmtime) => gmtime,
> +        Err(_) => return (0,0),
> +    };
> +
> +    let seconds = gmtime.tm_sec/2 & 0b11111;

Instead of stripping spaces, please add parenthesis for grouping (clippy
hint).
Also, for binary literals please use underscores for readability (group
them in nibbles, that way a group always represents a hex digit):

    let seconds = (gmtime.tm_sec / 2) & 0b1_1111;

same for all the ones below

> +    let minutes = gmtime.tm_min & 0xb111111;
> +    let hours = gmtime.tm_hour & 0b11111;
> +    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
> +
> +    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
> +        0
> +    } else {
> +        let day = gmtime.tm_mday & 0b11111;
> +        let month = (gmtime.tm_mon + 1) & 0b1111;
> +        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
> +        ((year<<9)|(month<<5)|(day)) as u16
> +    };
> +
> +    (date, time)
> +}
> +
> +struct Zip64Field {
> +    size: u64,
> +    compressed_size: u64,
> +    offset: u64,
> +}
> +
> +impl Zip64Field {
> +    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
> +        let size :u16 = if include_offset { 24 }  else { 16 };
> +        buf[0..2].copy_from_slice(ZIP64_TYPE);
> +        buf[2..4].copy_from_slice(&size.to_le_bytes());
> +        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
> +        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
> +        if include_offset {
> +            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
> +        }
> +    }
> +}
> +
> +struct FileEntry {
> +    filename: OsString,
> +    mtime: i64,
> +    mode: u16,
> +    zip64: Zip64Field,
> +    crc32: [u8; 4],
> +}
> +
> +impl FileEntry {
> +    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
> +        let mut relpath = PathBuf::new();
> +
> +        for comp in path.as_ref().components() {
> +            match comp {
> +                Component::Normal(_) => {
> +                    relpath.push(comp);
> +                },
> +                _ => {},
> +            }

clipy hint: `if let` would be shorter here:

          if let Component::Normal(_) = comp {
              relpath.push(comp);
          }


> +        }
> +
> +        Self {
> +            filename: relpath.into(),
> +            crc32: [0,0,0,0],
> +            mtime,
> +            mode,
> +            zip64: Zip64Field {
> +                size: 0,
> +                compressed_size: 0,
> +                offset: 0,
> +            },
> +        }
> +    }
> +
> +    fn local_file_header(&self, buf: &mut [u8]) -> usize {

method name & signature doesn't really make the purpose of this function clear

> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size: usize = 30 + (filename_len as usize) + 20;
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_NEEDED);
> +        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
> +        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
> +        buf[14..26].copy_from_slice(&[
> +            0x00, 0x00, 0x00, 0x00, // crc32
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
> +
> +        buf[28..30].copy_from_slice(&[20, 00]); // extra field len

^ Is there a reason you chose to do it this way than, say, define a

    #[derive(Endian)]
    #[repr(C)]
    struct LocalFileHeader {
        <contents up to the file name>
    }

you could do this for all structs and have a helper like:

    fn write_struct<E, T>(output: &mut T, data: E) -> io::Result<()>
    where
        T: Write + ?Sized,
        E: Endian,
    {
        let data = data.to_le();
        output.write_all(unsafe {
            std::slice::from_raw_parts(
                &data as *const E as *const u8,
                size_of_val(&data),
            )
        })
    }

and use `write_struct(&mut buffer, LocalFileHeader { ... })?`

> +
> +        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
> +
> +        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
> +
> +        size
> +    }
> +
> +    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
> +        let size = 24;
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
> +        buf[4..8].copy_from_slice(&self.crc32);
> +        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
> +        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
> +
> +        size
> +    }
> +
> +    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size = 46 + 28 + (filename_len as usize);
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_MADE_BY);
> +        buf[6..8].copy_from_slice(VERSION_NEEDED);
> +
> +        buf[8..12].copy_from_slice(&[
> +            0x08, 0x00, // general purpose flags
> +            0x00, 0x00, // compression
> +        ]);
> +
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[12..14].copy_from_slice(&time.to_le_bytes());
> +        buf[14..16].copy_from_slice(&date.to_le_bytes());
> +        buf[16..20].copy_from_slice(&self.crc32);
> +
> +        buf[20..28].copy_from_slice(&[
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +
> +        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
> +        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
> +
> +        buf[32..38].copy_from_slice(&[
> +            0x00, 0x00, // comment len
> +            0x00, 0x00, // start disk
> +            0x00, 0x00, // internal flags
> +        ]);
> +
> +        buf[38..40].copy_from_slice(&[
> +            0x00, 0x00, // upper part of external flags
> +        ]);
> +
> +        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
> +        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
> +
> +        buf[46..46+filename_len as usize].copy_from_slice(&filename);
> +        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
> +
> +        size
> +    }
> +}
> +
> +enum ZipStreamPos {
> +    // index of file and if the header is already finished
> +    FileHeader(usize),
> +    File(usize),
> +    CentralIndex,
> +    End,
> +}
> +
> +/// Shorthand for the necessary metadata for a file inside a ZIP
> +pub type File<R> = (PathBuf, i64, u16, R);
> +
> +/// Represents a ZIP file that can be streamed
> +///
> +/// This will create a ZIP file on the fly by getting File entries from the
> +/// given stream, and finishes it when the stream is done
> +/// Example:
> +/// ```no_run
> +/// use proxmox_backup::tools::zip;
> +///
> +/// #[tokio::async]
> +/// async fn main() {
> +///     let (sender, receiver) = tokio::sync::mpsc::channel();
> +///     let zip = zip::ZipEncoderStream::new(receiver);
> +///
> +///     tokio::spawn(async move {
> +///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
> +///     });
> +///
> +///     zip.next().await; // until done
> +///
> +/// }
> +/// ```
> +pub struct ZipEncoderStream<R, S>
> +where
> +    R: AsyncRead + Unpin,
> +    S: Stream<Item = File<R>> + Unpin,
> +{
> +    buf: ByteBuffer,
> +    bytes: usize,
> +    central_dir_offset: usize,
> +    central_dir_size: usize,
> +    cur_reader: Option<R>,
> +    entrycount: usize,
> +    files: VecDeque<FileEntry>,
> +    filestream: Option<S>,
> +    hasher: Option<Hasher>,
> +    pos: ZipStreamPos,
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
> +    pub fn new(stream: S) -> Self {
> +        Self {
> +            buf: ByteBuffer::with_capacity(4*1024*1024),
> +            bytes: 0,
> +            central_dir_offset: 0,
> +            central_dir_size: 0,
> +            cur_reader: None,
> +            entrycount: 0,
> +            files: VecDeque::new(),
> +            filestream: Some(stream),
> +            hasher: None,
> +            pos: ZipStreamPos::FileHeader(0),
> +        }
> +    }
> +
> +    fn eocd(&mut self) -> usize {
> +        let size = if self.central_dir_size > u32::MAX as usize
> +            || self.central_dir_offset > u32::MAX as  usize
> +            || self.entrycount > u16::MAX as usize
> +        {
> +            56+20+22
> +        } else {
> +            22
> +        };
> +
> +
> +        if self.buf.free_size() < size { return 0; }
> +
> +        let eocd_start = size - 22;
> +        let buf = self.buf.get_free_mut_slice();
> +
> +        let mut count = self.entrycount as u16;
> +        let mut dir_size = self.central_dir_size as u32;
> +        let mut offset = self.central_dir_offset as u32;
> +
> +        if size > 22 {
> +            count = 0xFFFF;
> +            dir_size = 0xFFFFFFFF;
> +            offset = 0xFFFFFFFF;
> +
> +            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
> +            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
> +            buf[12..14].copy_from_slice(VERSION_MADE_BY);
> +            buf[14..16].copy_from_slice(VERSION_NEEDED);
> +            buf[16..24].copy_from_slice(&[
> +                0x00, 0x00, 0x00, 0x00, // number of disk
> +                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
> +            ]);
> +            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
> +            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
> +            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
> +            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
> +
> +            let locator_offset = self.central_dir_offset + self.central_dir_size;
> +            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
> +            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
> +            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
> +            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
> +        }
> +
> +        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
> +        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
> +            0x00, 0x00, // disk number
> +            0x00, 0x00, // start disk
> +        ]);
> +
> +        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
> +        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
> +
> +        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
> +        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
> +        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
> +
> +        size
> +    }
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
> +    type Item = Result<Vec<u8>, std::io::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.pos {
> +                ZipStreamPos::FileHeader(idx) => {
> +                    if this.files.is_empty() || idx > this.files.len() - 1 {
> +                        if let Some(mut stream) = this.filestream.take() {
> +                            match Pin::new(&mut stream).poll_next(cx) {
> +                                Poll::Ready(Some((path, mtime, mode, file))) => {
> +                                    this.filestream = Some(stream);
> +                                    let entry = FileEntry::new(path, mtime, mode);
> +                                    this.files.push_back(entry);
> +                                    this.cur_reader = Some(file);
> +                                    continue;
> +                                }
> +                                Poll::Pending => {
> +                                    this.filestream = Some(stream);
> +                                    if this.buf.is_empty() {
> +                                        return Poll::Pending;
> +                                    }
> +                                    break;
> +                                }
> +                                Poll::Ready(None) => {},
> +                            }
> +                        }
> +                        this.pos = ZipStreamPos::CentralIndex;
> +                        this.central_dir_offset = this.bytes;
> +                        this.entrycount = this.files.len();
> +                        continue;
> +                    }
> +
> +                    let mut entry = &mut this.files[idx];
> +                    entry.zip64.offset = this.bytes as u64;
> +                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
> +                    this.bytes += size;
> +                    this.buf.add_size(size as usize);
> +
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    if this.cur_reader.is_some() {
> +                        this.pos = ZipStreamPos::File(idx);
> +                    } else {
> +                        this.pos = ZipStreamPos::FileHeader(idx + 1);
> +                    }
> +
> +                    if this.buf.is_full() {
> +                        break;
> +                    }
> +                },
> +                ZipStreamPos::File(idx) => {
> +                    let mut entry = &mut this.files[idx];
> +                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
> +                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
> +                        Poll::Ready(Ok(n)) => {
> +                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
> +                            this.buf.add_size(n);
> +                            if n == 0 {
> +                                entry.crc32 = hasher.finalize().to_le_bytes();
> +                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
> +                                this.buf.add_size(size);
> +                                this.bytes += size;
> +
> +                                if size == 0 {
> +                                    break;
> +                                }
> +
> +                                this.pos = ZipStreamPos::FileHeader(idx + 1);
> +
> +                                if this.buf.is_full() {
> +                                    break;
> +                                }
> +
> +                                continue;
> +                            }
> +
> +                            this.bytes += n;
> +                            entry.zip64.size += n as u64;
> +                            entry.zip64.compressed_size += n as u64;
> +
> +                            hasher.update(&this.buf[this.buf.len() - n..]);
> +                            this.hasher = Some(hasher);
> +                            this.cur_reader = Some(reader);
> +
> +                            if this.buf.is_full() {
> +                                break;
> +                            }
> +                        }
> +                        Poll::Pending => {
> +                            this.cur_reader = Some(reader);
> +                            if this.buf.is_empty() {
> +                                return Poll::Pending;
> +                            }
> +                            break;
> +                        }
> +                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
> +                    }
> +                },
> +                ZipStreamPos::CentralIndex => {
> +                    let mut finished_central_directory = true;
> +                    while this.files.len() > 0 {
> +                        let file = this.files.pop_front().unwrap();
> +                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
> +                        this.buf.add_size(size);
> +                        if size == 0 {
> +                            this.files.push_front(file);
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +
> +                        this.bytes += size;
> +                        this.central_dir_size += size;
> +
> +                        if this.buf.is_full() {
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +                    }
> +
> +                    if !finished_central_directory {
> +                        break;
> +                    }
> +
> +                    let size = this.eocd();
> +                    this.buf.add_size(size);
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    this.pos = ZipStreamPos::End;
> +                    break;
> +                }
> +                ZipStreamPos::End => return Poll::Ready(None)
> +            }
> +        }
> +
> +        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
> +    }
> +}
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
  2020-10-13  9:50 [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Dominik Csapak
  2020-10-13  9:50 ` [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip Dominik Csapak
  2020-10-13 11:02 ` [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Wolfgang Bumiller
@ 2020-10-13 15:04 ` Wolfgang Bumiller
  2 siblings, 0 replies; 4+ messages in thread
From: Wolfgang Bumiller @ 2020-10-13 15:04 UTC (permalink / raw)
  To: Dominik Csapak; +Cc: pbs-devel

Okay we really need to consider adding async generator support or
finding some better solution for this.

An idea would be to have a Buffer with 2 accessors: one implementing AsyncWrite
in such a way that when it is full it returns `Pending` to "yield", and
one which allows taking the current data out of it (both may have to be
used with mutable borrows simultaneously, therefore the two accessors).
That might help get rid of the "return size 0 if buffer is too full"
strategy, basically write the whole code as if there was no buffer to
fill, as an `async fn`, with the Stream's `poll_next` just polling that
method and taking any available data out of the buffer, if you get the
idea. I just find that state machine quite hard to follow, likely also
because all of the actual data is written manually with a bunch of magic
numbers...

On Tue, Oct 13, 2020 at 11:50:41AM +0200, Dominik Csapak wrote:
> this is a module to to stream a zip file in an api call
> the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
> the resulting object itself implements stream for easy use in a
> hyper::Body
> 
> for now, this does not implement compression (uses ZIPs STORE mode), and
> does not support empty directories or hardlinks (or any other special
> files)
> 
> Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
> ---
>  src/tools.rs     |   1 +
>  src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 469 insertions(+)
>  create mode 100644 src/tools/zip.rs
> 
> diff --git a/src/tools.rs b/src/tools.rs
> index 5b244c02..28f0338b 100644
> --- a/src/tools.rs
> +++ b/src/tools.rs
> @@ -35,6 +35,7 @@ pub mod nom;
>  pub mod logrotate;
>  pub mod loopdev;
>  pub mod fuse_loop;
> +pub mod zip;
>  
>  mod parallel_handler;
>  pub use parallel_handler::*;
> diff --git a/src/tools/zip.rs b/src/tools/zip.rs
> new file mode 100644
> index 00000000..acb193cb
> --- /dev/null
> +++ b/src/tools/zip.rs
> @@ -0,0 +1,468 @@
> +use std::ffi::OsString;
> +use std::collections::VecDeque;
> +use std::os::unix::ffi::OsStrExt;
> +use std::pin::Pin;
> +use std::path::{Component, Path, PathBuf};
> +
> +use futures::task::{Context, Poll};
> +use tokio::io::AsyncRead;
> +use tokio::stream::Stream;
> +use anyhow::Result;
> +
> +use proxmox::io_format_err;
> +use proxmox::tools::{
> +    byte_buffer::ByteBuffer,
> +    time::gmtime,
> +};
> +use crc32fast::Hasher;
> +
> +const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
> +const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
> +const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
> +const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
> +const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
> +const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
> +
> +const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
> +const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
> +const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
> +
> +fn epoch_to_dos(epoch: i64) -> (u16, u16) {
> +    let gmtime = match gmtime(epoch) {
> +        Ok(gmtime) => gmtime,
> +        Err(_) => return (0,0),
> +    };
> +
> +    let seconds = gmtime.tm_sec/2 & 0b11111;
> +    let minutes = gmtime.tm_min & 0xb111111;
> +    let hours = gmtime.tm_hour & 0b11111;
> +    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
> +
> +    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
> +        0
> +    } else {
> +        let day = gmtime.tm_mday & 0b11111;
> +        let month = (gmtime.tm_mon + 1) & 0b1111;
> +        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
> +        ((year<<9)|(month<<5)|(day)) as u16
> +    };
> +
> +    (date, time)
> +}
> +
> +struct Zip64Field {
> +    size: u64,
> +    compressed_size: u64,
> +    offset: u64,
> +}
> +
> +impl Zip64Field {
> +    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
> +        let size :u16 = if include_offset { 24 }  else { 16 };
> +        buf[0..2].copy_from_slice(ZIP64_TYPE);
> +        buf[2..4].copy_from_slice(&size.to_le_bytes());
> +        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
> +        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
> +        if include_offset {
> +            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
> +        }
> +    }
> +}
> +
> +struct FileEntry {
> +    filename: OsString,
> +    mtime: i64,
> +    mode: u16,
> +    zip64: Zip64Field,
> +    crc32: [u8; 4],
> +}
> +
> +impl FileEntry {
> +    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
> +        let mut relpath = PathBuf::new();
> +
> +        for comp in path.as_ref().components() {
> +            match comp {
> +                Component::Normal(_) => {
> +                    relpath.push(comp);
> +                },
> +                _ => {},
> +            }
> +        }
> +
> +        Self {
> +            filename: relpath.into(),
> +            crc32: [0,0,0,0],
> +            mtime,
> +            mode,
> +            zip64: Zip64Field {
> +                size: 0,
> +                compressed_size: 0,
> +                offset: 0,
> +            },
> +        }
> +    }
> +
> +    fn local_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size: usize = 30 + (filename_len as usize) + 20;
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_NEEDED);
> +        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
> +        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
> +        buf[14..26].copy_from_slice(&[
> +            0x00, 0x00, 0x00, 0x00, // crc32
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
> +
> +        buf[28..30].copy_from_slice(&[20, 00]); // extra field len
> +
> +        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
> +
> +        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
> +
> +        size
> +    }
> +
> +    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
> +        let size = 24;
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
> +        buf[4..8].copy_from_slice(&self.crc32);
> +        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
> +        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
> +
> +        size
> +    }
> +
> +    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
> +        let filename = self.filename.as_bytes();
> +        let filename_len: u16 = filename.len() as u16;
> +        let size = 46 + 28 + (filename_len as usize);
> +
> +        if size > buf.len() { return 0; }
> +
> +        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
> +        buf[4..6].copy_from_slice(VERSION_MADE_BY);
> +        buf[6..8].copy_from_slice(VERSION_NEEDED);
> +
> +        buf[8..12].copy_from_slice(&[
> +            0x08, 0x00, // general purpose flags
> +            0x00, 0x00, // compression
> +        ]);
> +
> +        let (date, time) = epoch_to_dos(self.mtime);
> +
> +        buf[12..14].copy_from_slice(&time.to_le_bytes());
> +        buf[14..16].copy_from_slice(&date.to_le_bytes());
> +        buf[16..20].copy_from_slice(&self.crc32);
> +
> +        buf[20..28].copy_from_slice(&[
> +            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
> +            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
> +        ]);
> +
> +        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
> +        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
> +
> +        buf[32..38].copy_from_slice(&[
> +            0x00, 0x00, // comment len
> +            0x00, 0x00, // start disk
> +            0x00, 0x00, // internal flags
> +        ]);
> +
> +        buf[38..40].copy_from_slice(&[
> +            0x00, 0x00, // upper part of external flags
> +        ]);
> +
> +        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
> +        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
> +
> +        buf[46..46+filename_len as usize].copy_from_slice(&filename);
> +        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
> +
> +        size
> +    }
> +}
> +
> +enum ZipStreamPos {
> +    // index of file and if the header is already finished
> +    FileHeader(usize),
> +    File(usize),
> +    CentralIndex,
> +    End,
> +}
> +
> +/// Shorthand for the necessary metadata for a file inside a ZIP
> +pub type File<R> = (PathBuf, i64, u16, R);

^ Please make this a struct with named members.

> +
> +/// Represents a ZIP file that can be streamed
> +///
> +/// This will create a ZIP file on the fly by getting File entries from the
> +/// given stream, and finishes it when the stream is done
> +/// Example:
> +/// ```no_run
> +/// use proxmox_backup::tools::zip;
> +///
> +/// #[tokio::async]
> +/// async fn main() {
> +///     let (sender, receiver) = tokio::sync::mpsc::channel();
> +///     let zip = zip::ZipEncoderStream::new(receiver);
> +///
> +///     tokio::spawn(async move {
> +///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
> +///     });
> +///
> +///     zip.next().await; // until done
> +///
> +/// }
> +/// ```
> +pub struct ZipEncoderStream<R, S>
> +where
> +    R: AsyncRead + Unpin,
> +    S: Stream<Item = File<R>> + Unpin,
> +{
> +    buf: ByteBuffer,
> +    bytes: usize,
> +    central_dir_offset: usize,
> +    central_dir_size: usize,
> +    cur_reader: Option<R>,
> +    entrycount: usize,
> +    files: VecDeque<FileEntry>,
> +    filestream: Option<S>,
> +    hasher: Option<Hasher>,
> +    pos: ZipStreamPos,
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
> +    pub fn new(stream: S) -> Self {
> +        Self {
> +            buf: ByteBuffer::with_capacity(4*1024*1024),

You use this constant below again as well. Does this need to be used
explicitly at the bottom? If so, please make it a named const.

> +            bytes: 0,
> +            central_dir_offset: 0,
> +            central_dir_size: 0,
> +            cur_reader: None,
> +            entrycount: 0,
> +            files: VecDeque::new(),
> +            filestream: Some(stream),
> +            hasher: None,
> +            pos: ZipStreamPos::FileHeader(0),
> +        }
> +    }
> +
> +    fn eocd(&mut self) -> usize {
> +        let size = if self.central_dir_size > u32::MAX as usize
> +            || self.central_dir_offset > u32::MAX as  usize
> +            || self.entrycount > u16::MAX as usize
> +        {
> +            56+20+22

magic numbers?

> +        } else {
> +            22
> +        };
> +
> +
> +        if self.buf.free_size() < size { return 0; }
> +
> +        let eocd_start = size - 22;
> +        let buf = self.buf.get_free_mut_slice();
> +
> +        let mut count = self.entrycount as u16;
> +        let mut dir_size = self.central_dir_size as u32;
> +        let mut offset = self.central_dir_offset as u32;
> +
> +        if size > 22 {

name consts and structs with `size_of` would be more easy to follow...

> +            count = 0xFFFF;
> +            dir_size = 0xFFFFFFFF;
> +            offset = 0xFFFFFFFF;
> +
> +            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
> +            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
> +            buf[12..14].copy_from_slice(VERSION_MADE_BY);
> +            buf[14..16].copy_from_slice(VERSION_NEEDED);
> +            buf[16..24].copy_from_slice(&[
> +                0x00, 0x00, 0x00, 0x00, // number of disk
> +                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
> +            ]);
> +            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
> +            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
> +            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
> +            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
> +
> +            let locator_offset = self.central_dir_offset + self.central_dir_size;
> +            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
> +            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
> +            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
> +            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
> +        }
> +
> +        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
> +        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
> +            0x00, 0x00, // disk number
> +            0x00, 0x00, // start disk
> +        ]);
> +
> +        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
> +        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
> +
> +        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
> +        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
> +        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
> +
> +        size
> +    }
> +}
> +
> +impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
> +    type Item = Result<Vec<u8>, std::io::Error>;
> +
> +    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
> +        let this = self.get_mut();
> +
> +        loop {
> +            match this.pos {

Please use separate methods for each case.

> +                ZipStreamPos::FileHeader(idx) => {
> +                    if this.files.is_empty() || idx > this.files.len() - 1 {
> +                        if let Some(mut stream) = this.filestream.take() {

Also for this, with some enum return value, because...

> +                            match Pin::new(&mut stream).poll_next(cx) {
> +                                Poll::Ready(Some((path, mtime, mode, file))) => {
> +                                    this.filestream = Some(stream);
> +                                    let entry = FileEntry::new(path, mtime, mode);
> +                                    this.files.push_back(entry);
> +                                    this.cur_reader = Some(file);
> +                                    continue;
> +                                }
> +                                Poll::Pending => {
> +                                    this.filestream = Some(stream);
> +                                    if this.buf.is_empty() {
> +                                        return Poll::Pending;
> +                                    }
> +                                    break;

...because this is quite the jump.

> +                                }
> +                                Poll::Ready(None) => {},
> +                            }
> +                        }
> +                        this.pos = ZipStreamPos::CentralIndex;
> +                        this.central_dir_offset = this.bytes;
> +                        this.entrycount = this.files.len();
> +                        continue;
> +                    }
> +
> +                    let mut entry = &mut this.files[idx];
> +                    entry.zip64.offset = this.bytes as u64;
> +                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
> +                    this.bytes += size;
> +                    this.buf.add_size(size as usize);
> +
> +                    if size == 0 {

Rather put the separating space before the `let size =` line, do this
before the `add`s.

Consider moving the size-adding into a separate method and have the
writer-methods use it already, and have those only return a boolean?

Because...

> +                        break;
> +                    }
> +
> +                    if this.cur_reader.is_some() {
> +                        this.pos = ZipStreamPos::File(idx);
> +                    } else {
> +                        this.pos = ZipStreamPos::FileHeader(idx + 1);
> +                    }
> +
> +                    if this.buf.is_full() {
> +                        break;
> +                    }
> +                },
> +                ZipStreamPos::File(idx) => {
> +                    let mut entry = &mut this.files[idx];
> +                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
> +                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
> +                        Poll::Ready(Ok(n)) => {
> +                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
> +                            this.buf.add_size(n);
> +                            if n == 0 {
> +                                entry.crc32 = hasher.finalize().to_le_bytes();
> +                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
> +                                this.buf.add_size(size);
> +                                this.bytes += size;
> +
> +                                if size == 0 {
> +                                    break;
> +                                }

... same pattern here ...

> +
> +                                this.pos = ZipStreamPos::FileHeader(idx + 1);
> +
> +                                if this.buf.is_full() {
> +                                    break;
> +                                }
> +
> +                                continue;
> +                            }
> +
> +                            this.bytes += n;
> +                            entry.zip64.size += n as u64;
> +                            entry.zip64.compressed_size += n as u64;
> +
> +                            hasher.update(&this.buf[this.buf.len() - n..]);
> +                            this.hasher = Some(hasher);
> +                            this.cur_reader = Some(reader);
> +
> +                            if this.buf.is_full() {
> +                                break;
> +                            }
> +                        }
> +                        Poll::Pending => {
> +                            this.cur_reader = Some(reader);
> +                            if this.buf.is_empty() {
> +                                return Poll::Pending;
> +                            }
> +                            break;
> +                        }
> +                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
> +                    }
> +                },
> +                ZipStreamPos::CentralIndex => {
> +                    let mut finished_central_directory = true;
> +                    while this.files.len() > 0 {
> +                        let file = this.files.pop_front().unwrap();
> +                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
> +                        this.buf.add_size(size);
> +                        if size == 0 {

... and here.

> +                            this.files.push_front(file);
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +
> +                        this.bytes += size;
> +                        this.central_dir_size += size;
> +
> +                        if this.buf.is_full() {
> +                            finished_central_directory = false;
> +                            break;
> +                        }
> +                    }
> +
> +                    if !finished_central_directory {
> +                        break;
> +                    }
> +
> +                    let size = this.eocd();
> +                    this.buf.add_size(size);
> +                    if size == 0 {
> +                        break;
> +                    }
> +
> +                    this.pos = ZipStreamPos::End;
> +                    break;
> +                }
> +                ZipStreamPos::End => return Poll::Ready(None)
> +            }
> +        }
> +
> +        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
> +    }
> +}
> -- 
> 2.20.1




^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2020-10-13 15:05 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-10-13  9:50 [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Dominik Csapak
2020-10-13  9:50 ` [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip Dominik Csapak
2020-10-13 11:02 ` [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Wolfgang Bumiller
2020-10-13 15:04 ` Wolfgang Bumiller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal