all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module
Date: Tue, 13 Oct 2020 11:50:41 +0200	[thread overview]
Message-ID: <20201013095042.31337-1-d.csapak@proxmox.com> (raw)

this is a module to to stream a zip file in an api call
the user has to give the Encoder a stream of files (Path, mtime, mode, Reader)
the resulting object itself implements stream for easy use in a
hyper::Body

for now, this does not implement compression (uses ZIPs STORE mode), and
does not support empty directories or hardlinks (or any other special
files)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/tools.rs     |   1 +
 src/tools/zip.rs | 468 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 469 insertions(+)
 create mode 100644 src/tools/zip.rs

diff --git a/src/tools.rs b/src/tools.rs
index 5b244c02..28f0338b 100644
--- a/src/tools.rs
+++ b/src/tools.rs
@@ -35,6 +35,7 @@ pub mod nom;
 pub mod logrotate;
 pub mod loopdev;
 pub mod fuse_loop;
+pub mod zip;
 
 mod parallel_handler;
 pub use parallel_handler::*;
diff --git a/src/tools/zip.rs b/src/tools/zip.rs
new file mode 100644
index 00000000..acb193cb
--- /dev/null
+++ b/src/tools/zip.rs
@@ -0,0 +1,468 @@
+use std::ffi::OsString;
+use std::collections::VecDeque;
+use std::os::unix::ffi::OsStrExt;
+use std::pin::Pin;
+use std::path::{Component, Path, PathBuf};
+
+use futures::task::{Context, Poll};
+use tokio::io::AsyncRead;
+use tokio::stream::Stream;
+use anyhow::Result;
+
+use proxmox::io_format_err;
+use proxmox::tools::{
+    byte_buffer::ByteBuffer,
+    time::gmtime,
+};
+use crc32fast::Hasher;
+
+const LOCAL_FH_SIG: &[u8] = &[0x50, 0x4b, 0x03, 0x04];
+const LOCAL_FF_SIG: &[u8] = &[0x50, 0x4b, 0x07, 0x08];
+const CENTRAL_DIRECTORY_FH_SIG: &[u8] = &[0x50, 0x4b, 0x01, 0x02];
+const END_OF_CENTRAL_DIR: &[u8] = &[0x50, 0x4b, 0x05, 0x06];
+const VERSION_NEEDED: &[u8] = &[0x2d, 0x00]; // version 4.5 (0x2D)
+const VERSION_MADE_BY: &[u8] = &[0x2d, 0x03]; // version 4.5 (0x2d), UNIX (0x03)
+
+const ZIP64_TYPE: &[u8] = &[0x01, 0x00];
+const ZIP64_EOCD_RECORD: &[u8] = &[0x50, 0x4b, 0x06, 0x06];
+const ZIP64_EOCD_LOCATOR: &[u8] = &[0x50, 0x4b, 0x06, 0x07];
+
+fn epoch_to_dos(epoch: i64) -> (u16, u16) {
+    let gmtime = match gmtime(epoch) {
+        Ok(gmtime) => gmtime,
+        Err(_) => return (0,0),
+    };
+
+    let seconds = gmtime.tm_sec/2 & 0b11111;
+    let minutes = gmtime.tm_min & 0xb111111;
+    let hours = gmtime.tm_hour & 0b11111;
+    let time: u16 = ((hours<<11)|(minutes<<5)|(seconds)) as u16;
+
+    let date: u16 = if gmtime.tm_year > (2108-1900) || gmtime.tm_year < (1980-1900) {
+        0
+    } else {
+        let day = gmtime.tm_mday & 0b11111;
+        let month = (gmtime.tm_mon + 1) & 0b1111;
+        let year = (gmtime.tm_year + 1900 - 1980) & 0b1111111;
+        ((year<<9)|(month<<5)|(day)) as u16
+    };
+
+    (date, time)
+}
+
+struct Zip64Field {
+    size: u64,
+    compressed_size: u64,
+    offset: u64,
+}
+
+impl Zip64Field {
+    fn to_bytes(&self, buf: &mut [u8], include_offset: bool) {
+        let size :u16 = if include_offset { 24 }  else { 16 };
+        buf[0..2].copy_from_slice(ZIP64_TYPE);
+        buf[2..4].copy_from_slice(&size.to_le_bytes());
+        buf[4..12].copy_from_slice(&self.compressed_size.to_le_bytes());
+        buf[12..20].copy_from_slice(&self.size.to_le_bytes());
+        if include_offset {
+            buf[20..28].copy_from_slice(&self.offset.to_le_bytes());
+        }
+    }
+}
+
+struct FileEntry {
+    filename: OsString,
+    mtime: i64,
+    mode: u16,
+    zip64: Zip64Field,
+    crc32: [u8; 4],
+}
+
+impl FileEntry {
+    fn new<P: AsRef<Path>>(path: P, mtime: i64, mode: u16) -> Self {
+        let mut relpath = PathBuf::new();
+
+        for comp in path.as_ref().components() {
+            match comp {
+                Component::Normal(_) => {
+                    relpath.push(comp);
+                },
+                _ => {},
+            }
+        }
+
+        Self {
+            filename: relpath.into(),
+            crc32: [0,0,0,0],
+            mtime,
+            mode,
+            zip64: Zip64Field {
+                size: 0,
+                compressed_size: 0,
+                offset: 0,
+            },
+        }
+    }
+
+    fn local_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size: usize = 30 + (filename_len as usize) + 20;
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_NEEDED);
+        buf[6..10].copy_from_slice(&[0x08, 0x00, 0x00, 0x00]); // flags + compression
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[10..12].copy_from_slice(&time.to_le_bytes()); // time
+        buf[12..14].copy_from_slice(&date.to_le_bytes()); // date
+        buf[14..26].copy_from_slice(&[
+            0x00, 0x00, 0x00, 0x00, // crc32
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+        buf[26..28].copy_from_slice(&filename_len.to_le_bytes()); // filename len
+
+        buf[28..30].copy_from_slice(&[20, 00]); // extra field len
+
+        buf[30..(30 + filename_len) as usize].copy_from_slice(filename);
+
+        self.zip64.to_bytes(&mut buf[(30 + filename_len) as usize..size], false);
+
+        size
+    }
+
+    fn local_file_footer(&self, buf: &mut [u8]) -> usize {
+        let size = 24;
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(LOCAL_FF_SIG);
+        buf[4..8].copy_from_slice(&self.crc32);
+        buf[8..16].copy_from_slice(&self.zip64.compressed_size.to_le_bytes());
+        buf[16..24].copy_from_slice(&self.zip64.size.to_le_bytes());
+
+        size
+    }
+
+    fn central_directory_file_header(&self, buf: &mut [u8]) -> usize {
+        let filename = self.filename.as_bytes();
+        let filename_len: u16 = filename.len() as u16;
+        let size = 46 + 28 + (filename_len as usize);
+
+        if size > buf.len() { return 0; }
+
+        buf[0..4].copy_from_slice(CENTRAL_DIRECTORY_FH_SIG);
+        buf[4..6].copy_from_slice(VERSION_MADE_BY);
+        buf[6..8].copy_from_slice(VERSION_NEEDED);
+
+        buf[8..12].copy_from_slice(&[
+            0x08, 0x00, // general purpose flags
+            0x00, 0x00, // compression
+        ]);
+
+        let (date, time) = epoch_to_dos(self.mtime);
+
+        buf[12..14].copy_from_slice(&time.to_le_bytes());
+        buf[14..16].copy_from_slice(&date.to_le_bytes());
+        buf[16..20].copy_from_slice(&self.crc32);
+
+        buf[20..28].copy_from_slice(&[
+            0xFF, 0xFF, 0xFF, 0xFF, // compressed size
+            0xFF, 0xFF, 0xFF, 0xFF, // uncompressed size
+        ]);
+
+        buf[28..30].copy_from_slice(&filename_len.to_le_bytes());
+        buf[30..32].copy_from_slice(&28u16.to_le_bytes()); // extra field len
+
+        buf[32..38].copy_from_slice(&[
+            0x00, 0x00, // comment len
+            0x00, 0x00, // start disk
+            0x00, 0x00, // internal flags
+        ]);
+
+        buf[38..40].copy_from_slice(&[
+            0x00, 0x00, // upper part of external flags
+        ]);
+
+        buf[40..42].copy_from_slice(&self.mode.to_le_bytes());
+        buf[42..46].copy_from_slice(&[0xFF, 0xFF, 0xFF, 0xFF]); // offset
+
+        buf[46..46+filename_len as usize].copy_from_slice(&filename);
+        self.zip64.to_bytes(&mut buf[46+filename_len as usize..size], true);
+
+        size
+    }
+}
+
+enum ZipStreamPos {
+    // index of file and if the header is already finished
+    FileHeader(usize),
+    File(usize),
+    CentralIndex,
+    End,
+}
+
+/// Shorthand for the necessary metadata for a file inside a ZIP
+pub type File<R> = (PathBuf, i64, u16, R);
+
+/// Represents a ZIP file that can be streamed
+///
+/// This will create a ZIP file on the fly by getting File entries from the
+/// given stream, and finishes it when the stream is done
+/// Example:
+/// ```no_run
+/// use proxmox_backup::tools::zip;
+///
+/// #[tokio::async]
+/// async fn main() {
+///     let (sender, receiver) = tokio::sync::mpsc::channel();
+///     let zip = zip::ZipEncoderStream::new(receiver);
+///
+///     tokio::spawn(async move {
+///         sender.send((PathBuf::from("/etc/test"), 0, 0o100644, tokio::io::empty())).await;
+///     });
+///
+///     zip.next().await; // until done
+///
+/// }
+/// ```
+pub struct ZipEncoderStream<R, S>
+where
+    R: AsyncRead + Unpin,
+    S: Stream<Item = File<R>> + Unpin,
+{
+    buf: ByteBuffer,
+    bytes: usize,
+    central_dir_offset: usize,
+    central_dir_size: usize,
+    cur_reader: Option<R>,
+    entrycount: usize,
+    files: VecDeque<FileEntry>,
+    filestream: Option<S>,
+    hasher: Option<Hasher>,
+    pos: ZipStreamPos,
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> ZipEncoderStream<R, S> {
+    pub fn new(stream: S) -> Self {
+        Self {
+            buf: ByteBuffer::with_capacity(4*1024*1024),
+            bytes: 0,
+            central_dir_offset: 0,
+            central_dir_size: 0,
+            cur_reader: None,
+            entrycount: 0,
+            files: VecDeque::new(),
+            filestream: Some(stream),
+            hasher: None,
+            pos: ZipStreamPos::FileHeader(0),
+        }
+    }
+
+    fn eocd(&mut self) -> usize {
+        let size = if self.central_dir_size > u32::MAX as usize
+            || self.central_dir_offset > u32::MAX as  usize
+            || self.entrycount > u16::MAX as usize
+        {
+            56+20+22
+        } else {
+            22
+        };
+
+
+        if self.buf.free_size() < size { return 0; }
+
+        let eocd_start = size - 22;
+        let buf = self.buf.get_free_mut_slice();
+
+        let mut count = self.entrycount as u16;
+        let mut dir_size = self.central_dir_size as u32;
+        let mut offset = self.central_dir_offset as u32;
+
+        if size > 22 {
+            count = 0xFFFF;
+            dir_size = 0xFFFFFFFF;
+            offset = 0xFFFFFFFF;
+
+            buf[0..4].copy_from_slice(ZIP64_EOCD_RECORD);
+            buf[4..12].copy_from_slice(&44u64.to_le_bytes()); // size without type+size
+            buf[12..14].copy_from_slice(VERSION_MADE_BY);
+            buf[14..16].copy_from_slice(VERSION_NEEDED);
+            buf[16..24].copy_from_slice(&[
+                0x00, 0x00, 0x00, 0x00, // number of disk
+                0x00, 0x00, 0x00, 0x00, // number of disk of central directory
+            ]);
+            buf[24..32].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries on disk
+            buf[32..40].copy_from_slice(&(self.entrycount as u64).to_le_bytes()); // num entries total
+            buf[40..48].copy_from_slice(&(self.central_dir_size as u64).to_le_bytes());
+            buf[48..56].copy_from_slice(&(self.central_dir_offset as u64).to_le_bytes());
+
+            let locator_offset = self.central_dir_offset + self.central_dir_size;
+            buf[56..60].copy_from_slice(ZIP64_EOCD_LOCATOR);
+            buf[60..64].copy_from_slice(&[0x00, 0x00, 0x00, 0x00]); // disk number
+            buf[64..72].copy_from_slice(&(locator_offset as u64).to_le_bytes());
+            buf[72..76].copy_from_slice(&[0x01, 0x00, 0x00, 0x00]); // total number of disks (1)
+        }
+
+        buf[eocd_start..eocd_start+4].copy_from_slice(END_OF_CENTRAL_DIR);
+        buf[eocd_start+4..eocd_start+8].copy_from_slice(&[
+            0x00, 0x00, // disk number
+            0x00, 0x00, // start disk
+        ]);
+
+        buf[eocd_start+8..eocd_start+10].copy_from_slice(&count.to_le_bytes());
+        buf[eocd_start+10..eocd_start+12].copy_from_slice(&count.to_le_bytes());
+
+        buf[eocd_start+12..eocd_start+16].copy_from_slice(&dir_size.to_le_bytes()); // entry count
+        buf[eocd_start+16..eocd_start+20].copy_from_slice(&offset.to_le_bytes()); // entry count
+        buf[eocd_start+20..eocd_start+22].copy_from_slice(&[0x00, 0x00]); // comment len
+
+        size
+    }
+}
+
+impl<R: AsyncRead + Unpin, S: Stream<Item = File<R>> + Unpin> Stream for ZipEncoderStream<R, S> {
+    type Item = Result<Vec<u8>, std::io::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+
+        loop {
+            match this.pos {
+                ZipStreamPos::FileHeader(idx) => {
+                    if this.files.is_empty() || idx > this.files.len() - 1 {
+                        if let Some(mut stream) = this.filestream.take() {
+                            match Pin::new(&mut stream).poll_next(cx) {
+                                Poll::Ready(Some((path, mtime, mode, file))) => {
+                                    this.filestream = Some(stream);
+                                    let entry = FileEntry::new(path, mtime, mode);
+                                    this.files.push_back(entry);
+                                    this.cur_reader = Some(file);
+                                    continue;
+                                }
+                                Poll::Pending => {
+                                    this.filestream = Some(stream);
+                                    if this.buf.is_empty() {
+                                        return Poll::Pending;
+                                    }
+                                    break;
+                                }
+                                Poll::Ready(None) => {},
+                            }
+                        }
+                        this.pos = ZipStreamPos::CentralIndex;
+                        this.central_dir_offset = this.bytes;
+                        this.entrycount = this.files.len();
+                        continue;
+                    }
+
+                    let mut entry = &mut this.files[idx];
+                    entry.zip64.offset = this.bytes as u64;
+                    let size = entry.local_file_header(this.buf.get_free_mut_slice());
+                    this.bytes += size;
+                    this.buf.add_size(size as usize);
+
+                    if size == 0 {
+                        break;
+                    }
+
+                    if this.cur_reader.is_some() {
+                        this.pos = ZipStreamPos::File(idx);
+                    } else {
+                        this.pos = ZipStreamPos::FileHeader(idx + 1);
+                    }
+
+                    if this.buf.is_full() {
+                        break;
+                    }
+                },
+                ZipStreamPos::File(idx) => {
+                    let mut entry = &mut this.files[idx];
+                    let mut reader = this.cur_reader.take().ok_or_else(|| io_format_err!("got not file data"))?;
+                    match Pin::new(&mut reader).poll_read(cx, this.buf.get_free_mut_slice()) {
+                        Poll::Ready(Ok(n)) => {
+                            let mut hasher = this.hasher.take().unwrap_or_else(Hasher::new);
+                            this.buf.add_size(n);
+                            if n == 0 {
+                                entry.crc32 = hasher.finalize().to_le_bytes();
+                                let size = entry.local_file_footer(this.buf.get_free_mut_slice());
+                                this.buf.add_size(size);
+                                this.bytes += size;
+
+                                if size == 0 {
+                                    break;
+                                }
+
+                                this.pos = ZipStreamPos::FileHeader(idx + 1);
+
+                                if this.buf.is_full() {
+                                    break;
+                                }
+
+                                continue;
+                            }
+
+                            this.bytes += n;
+                            entry.zip64.size += n as u64;
+                            entry.zip64.compressed_size += n as u64;
+
+                            hasher.update(&this.buf[this.buf.len() - n..]);
+                            this.hasher = Some(hasher);
+                            this.cur_reader = Some(reader);
+
+                            if this.buf.is_full() {
+                                break;
+                            }
+                        }
+                        Poll::Pending => {
+                            this.cur_reader = Some(reader);
+                            if this.buf.is_empty() {
+                                return Poll::Pending;
+                            }
+                            break;
+                        }
+                        Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
+                    }
+                },
+                ZipStreamPos::CentralIndex => {
+                    let mut finished_central_directory = true;
+                    while this.files.len() > 0 {
+                        let file = this.files.pop_front().unwrap();
+                        let size = file.central_directory_file_header(this.buf.get_free_mut_slice());
+                        this.buf.add_size(size);
+                        if size == 0 {
+                            this.files.push_front(file);
+                            finished_central_directory = false;
+                            break;
+                        }
+
+                        this.bytes += size;
+                        this.central_dir_size += size;
+
+                        if this.buf.is_full() {
+                            finished_central_directory = false;
+                            break;
+                        }
+                    }
+
+                    if !finished_central_directory {
+                        break;
+                    }
+
+                    let size = this.eocd();
+                    this.buf.add_size(size);
+                    if size == 0 {
+                        break;
+                    }
+
+                    this.pos = ZipStreamPos::End;
+                    break;
+                }
+                ZipStreamPos::End => return Poll::Ready(None)
+            }
+        }
+
+        return Poll::Ready(Some(Ok(this.buf.remove_data(4*1024*1024).to_vec())));
+    }
+}
-- 
2.20.1





             reply	other threads:[~2020-10-13  9:50 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-10-13  9:50 Dominik Csapak [this message]
2020-10-13  9:50 ` [pbs-devel] [PATCH proxmox-backup 2/2] api2/admin/datastore/pxar_file_download: download directory as zip Dominik Csapak
2020-10-13 11:02 ` [pbs-devel] [PATCH proxmox-backup 1/2] tools: add zip module Wolfgang Bumiller
2020-10-13 15:04 ` Wolfgang Bumiller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20201013095042.31337-1-d.csapak@proxmox.com \
    --to=d.csapak@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal