public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely
@ 2021-02-12 14:44 Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing Dominik Csapak
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Dominik Csapak @ 2021-02-12 14:44 UTC (permalink / raw)
  To: pbs-devel

to be able to restore containers with big sparse files

ideally we would save hole information directly in the pxar archive
and not even use zero chunks, so that we can have smaller
pxar archives, and accurately restore sparse files like they were before

for now, restore all files sparsely

changes from v3:
* collect subsequent holes, so that we only seek one time per hole
* add tests for sparse_copy(_async)
* also return if the last operation was a seek and only truncate then
* create buffer with explicit 4096 bytes
* use rustfmt
* adds a patch to add poll_once to proxmox module (for async testing)

changes from v2:
* always sparse copy and truncate after

changes from RFC:
* drop the zero module of proxmox, rust can generate fast code by itself

proxmox:

Dominik Csapak (2):
  proxmox/tools: add poll_once module for testing
  proxmox: add sparse_copy(_async) to tools::io

 proxmox/src/tools/io/mod.rs | 320 ++++++++++++++++++++++++++++++++++++
 proxmox/src/tools/mod.rs    |  36 ++++
 2 files changed, 356 insertions(+)

proxmox-backup:

Dominik Csapak (1):
  pxar/extract: if possible create files sparesly

 src/pxar/extract.rs | 43 ++++++++++++++++++++++++++++++++++++-------
 1 file changed, 36 insertions(+), 7 deletions(-)

-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing
  2021-02-12 14:44 [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely Dominik Csapak
@ 2021-02-12 14:44 ` Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 2/2] proxmox: add sparse_copy(_async) to tools::io Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox-backup v4 1/1] pxar/extract: if possible create files sparesly Dominik Csapak
  2 siblings, 0 replies; 4+ messages in thread
From: Dominik Csapak @ 2021-02-12 14:44 UTC (permalink / raw)
  To: pbs-devel

copied from the pxar crate, intended for polling a future once
this is helpful for testing async code
(this needs to resolve in the first poll otherwise it will
not work)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/mod.rs | 36 ++++++++++++++++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git a/proxmox/src/tools/mod.rs b/proxmox/src/tools/mod.rs
index 45f46f9..e938d90 100644
--- a/proxmox/src/tools/mod.rs
+++ b/proxmox/src/tools/mod.rs
@@ -227,3 +227,39 @@ pub fn nodename() -> &'static str {
 
     &NODENAME
 }
+
+#[cfg(test)]
+pub mod poll_once {
+    use std::future::Future;
+    use std::pin::Pin;
+    use std::task::{Context, Poll};
+
+    pub fn poll_result_once<T, R>(mut fut: T) -> std::io::Result<R>
+    where
+        T: Future<Output = std::io::Result<R>>,
+    {
+        let waker = std::task::RawWaker::new(std::ptr::null(), &WAKER_VTABLE);
+        let waker = unsafe { std::task::Waker::from_raw(waker) };
+        let mut cx = Context::from_waker(&waker);
+        unsafe {
+            match Pin::new_unchecked(&mut fut).poll(&mut cx) {
+                Poll::Pending => Err(crate::sys::error::io_err_other(
+                    "got Poll::Pending synchronous context",
+                )),
+                Poll::Ready(r) => r,
+            }
+        }
+    }
+
+    const WAKER_VTABLE: std::task::RawWakerVTable =
+        std::task::RawWakerVTable::new(forbid_clone, forbid_wake, forbid_wake, ignore_drop);
+
+    unsafe fn forbid_clone(_: *const ()) -> std::task::RawWaker {
+        panic!("tried to clone waker for synchronous task");
+    }
+
+    unsafe fn forbid_wake(_: *const ()) {
+        panic!("tried to wake synchronous task");
+    }
+    unsafe fn ignore_drop(_: *const ()) {}
+}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [PATCH proxmox v4 2/2] proxmox: add sparse_copy(_async) to tools::io
  2021-02-12 14:44 [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing Dominik Csapak
@ 2021-02-12 14:44 ` Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox-backup v4 1/1] pxar/extract: if possible create files sparesly Dominik Csapak
  2 siblings, 0 replies; 4+ messages in thread
From: Dominik Csapak @ 2021-02-12 14:44 UTC (permalink / raw)
  To: pbs-devel

this is able to seek the target instead of writing zeroes, which
generates sparse files where supported

also add tests for it

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/io/mod.rs | 320 ++++++++++++++++++++++++++++++++++++
 1 file changed, 320 insertions(+)

diff --git a/proxmox/src/tools/io/mod.rs b/proxmox/src/tools/io/mod.rs
index 2e92ebb..b935921 100644
--- a/proxmox/src/tools/io/mod.rs
+++ b/proxmox/src/tools/io/mod.rs
@@ -3,8 +3,328 @@
 //! The [`ReadExt`] trait provides additional operations for handling byte buffers for types
 //! implementing [`Read`](std::io::Read).
 
+use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
+
 mod read;
 pub use read::*;
 
 mod write;
 pub use write::*;
+
+fn buffer_is_zero(buf: &[u8]) -> bool {
+    !buf.chunks(128)
+        .map(|aa| aa.iter().fold(0, |a, b| a | b) != 0)
+        .any(|a| a)
+}
+
+/// Result of a sparse copy call
+/// contains the amount of written/seeked bytes
+/// and if the last operation was a seek
+pub struct SparseCopyResult {
+    pub written: u64,
+    pub seeked_last: bool,
+}
+
+/// copy similar to io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example use:
+/// ```
+/// # use std::io;
+/// # use proxmox::tools::io::sparse_copy;
+/// fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+///     R: io::Read,
+///     W: io::Write + io::Seek,
+/// {
+///     let res = sparse_copy(&mut reader, &mut writer)?;
+///
+///     println!("last part was seeked: {}", res.seeked_last);
+///     println!("written: {}", res.written);
+///
+///     Ok(())
+/// }
+/// ```
+pub fn sparse_copy<R: Read + ?Sized, W: Write + Seek + ?Sized>(
+    reader: &mut R,
+    writer: &mut W,
+) -> Result<SparseCopyResult, io::Error> {
+    let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+    let mut written = 0;
+    let mut seek_amount: i64 = 0;
+    let mut seeked_last = false;
+    loop {
+        buf.clear();
+        let len = match buf.read_from(reader) {
+            Ok(len) => len,
+            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+            Err(e) => return Err(e),
+        };
+
+        if len > 0 && buffer_is_zero(&buf[..]) {
+            seek_amount += len as i64;
+            continue;
+        }
+
+        if seek_amount > 0 {
+            writer.seek(SeekFrom::Current(seek_amount))?;
+            written += seek_amount as u64;
+            seek_amount = 0;
+            seeked_last = true;
+        }
+
+        if len > 0 {
+            writer.write_all(&buf[..])?;
+            written += len as u64;
+            seeked_last = false;
+        } else {
+            return Ok(SparseCopyResult {
+                written,
+                seeked_last,
+            });
+        }
+    }
+}
+
+#[cfg(feature = "tokio")]
+use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
+
+#[cfg(feature = "tokio")]
+/// copy similar to tokio::io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example:
+/// ```no_run
+/// # use std::io;
+/// # use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek};
+/// # use proxmox::tools::io::sparse_copy_async;
+/// async fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+///     R: AsyncRead + Unpin,
+///     W: AsyncWrite + AsyncSeek + Unpin,
+/// {
+///     let res = sparse_copy_async(&mut reader, &mut writer).await?;
+///
+///     println!("last part was seeked: {}", res.seeked_last);
+///     println!("written: {}", res.written);
+///
+///     Ok(())
+/// }
+/// ```
+pub async fn sparse_copy_async<R, W>(
+    reader: &mut R,
+    writer: &mut W,
+) -> Result<SparseCopyResult, io::Error>
+where
+    R: AsyncRead + Unpin,
+    W: AsyncWrite + AsyncSeek + Unpin,
+{
+    let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+    let mut written = 0;
+    let mut seek_amount: i64 = 0;
+    let mut seeked_last = false;
+    loop {
+        buf.clear();
+        let len = match buf.read_from_async(reader).await {
+            Ok(len) => len,
+            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+            Err(e) => return Err(e),
+        };
+
+        if len > 0 && buffer_is_zero(&buf[..]) {
+            seek_amount += len as i64;
+            continue;
+        }
+
+        if seek_amount > 0 {
+            writer.seek(SeekFrom::Current(seek_amount)).await?;
+            written += seek_amount as u64;
+            seek_amount = 0;
+            seeked_last = true;
+        }
+
+        if len > 0 {
+            writer.write_all(&buf[..]).await?;
+            written += len as u64;
+            seeked_last = false;
+        } else {
+            return Ok(SparseCopyResult {
+                written,
+                seeked_last,
+            });
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io::Cursor;
+    use std::pin::Pin;
+    use std::task::{Context, Poll};
+
+    use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
+
+    use crate::tools::io::{sparse_copy, sparse_copy_async};
+
+    const LEN: usize = 10000;
+
+    #[test]
+    fn test_sparse_copy() {
+        // test sparse
+        let mut test_data = Vec::new();
+        for _ in 0..LEN / 2 {
+            test_data.push(1u8);
+        }
+        for _ in 0..LEN / 2 {
+            test_data.push(0u8);
+        }
+        let mut test_data = Cursor::new(test_data);
+        let mut result_data = Cursor::new(vec![0; LEN]);
+
+        let result =
+            sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+        assert_eq!(result.written, LEN as u64);
+        assert_eq!(result.seeked_last, true);
+        for i in 0..LEN {
+            if i < LEN / 2 {
+                assert_eq!(result_data.get_ref()[i], 1);
+            } else {
+                assert_eq!(result_data.get_ref()[i], 0);
+            }
+        }
+
+        // test non sparse
+        let mut test_data = Cursor::new(vec![1; LEN]);
+        let mut result_data = Cursor::new(vec![0; LEN]);
+
+        let result =
+            sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+        assert_eq!(result.written, LEN as u64);
+        assert_eq!(result.seeked_last, false);
+        for i in 0..LEN {
+            assert_eq!(result_data.get_ref()[i], 1);
+        }
+    }
+
+    struct DummyAsyncReader<R> {
+        inner: R,
+    }
+
+    struct DummyAsyncWriter<W> {
+        inner: W,
+        seek_pos: u64,
+    }
+
+    impl<R: std::io::Read + Unpin> AsyncRead for DummyAsyncReader<R> {
+        fn poll_read(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+            buf: &mut ReadBuf<'_>,
+        ) -> Poll<std::io::Result<()>> {
+            let this = Pin::get_mut(self);
+            let mut read_buf = buf.initialize_unfilled();
+            match this.inner.read(&mut read_buf) {
+                Ok(len) => {
+                    buf.advance(len);
+                    Poll::Ready(Ok(()))
+                }
+                Err(err) => Poll::Ready(Err(err)),
+            }
+        }
+    }
+
+    impl<R: std::io::Write + Unpin> AsyncWrite for DummyAsyncWriter<R> {
+        fn poll_write(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+            buf: &[u8],
+        ) -> Poll<std::io::Result<usize>> {
+            let this = Pin::get_mut(self);
+            match this.inner.write(buf) {
+                Ok(len) => Poll::Ready(Ok(len)),
+                Err(err) => Poll::Ready(Err(err)),
+            }
+        }
+
+        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+            Poll::Ready(Ok(()))
+        }
+
+        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+            Poll::Ready(Ok(()))
+        }
+    }
+
+    impl<R: std::io::Seek + Unpin> AsyncSeek for DummyAsyncWriter<R> {
+        fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+            let this = Pin::get_mut(self);
+            this.seek_pos = this.inner.seek(position)?;
+            Ok(())
+        }
+
+        fn poll_complete(
+            self: Pin<&mut Self>,
+            _cx: &mut Context<'_>,
+        ) -> Poll<std::io::Result<u64>> {
+            let this = Pin::get_mut(self);
+            Poll::Ready(Ok(this.seek_pos))
+        }
+    }
+
+    #[test]
+    fn test_sparse_copy_async() {
+        let fut = async {
+            // test sparse
+            let mut test_data = Vec::new();
+            for _ in 0..LEN / 2 {
+                test_data.push(1u8);
+            }
+            for _ in 0..LEN / 2 {
+                test_data.push(0u8);
+            }
+            let mut test_data = DummyAsyncReader {
+                inner: Cursor::new(test_data),
+            };
+            let mut result_data = DummyAsyncWriter {
+                inner: Cursor::new(vec![0; LEN]),
+                seek_pos: 0,
+            };
+
+            let result = sparse_copy_async(&mut test_data, &mut result_data)
+                .await
+                .expect("error during sparse copy");
+
+            assert_eq!(result.written, LEN as u64);
+            assert_eq!(result.seeked_last, true);
+            for i in 0..LEN {
+                if i < LEN / 2 {
+                    assert_eq!(result_data.inner.get_ref()[i], 1);
+                } else {
+                    assert_eq!(result_data.inner.get_ref()[i], 0);
+                }
+            }
+
+            // test non sparse
+            let mut test_data = DummyAsyncReader {
+                inner: Cursor::new(vec![1; LEN]),
+            };
+            let mut result_data = DummyAsyncWriter {
+                inner: Cursor::new(vec![0; LEN]),
+                seek_pos: 0,
+            };
+
+            let result = sparse_copy_async(&mut test_data, &mut result_data)
+                .await
+                .expect("error during sparse copy");
+
+            assert_eq!(result.written, LEN as u64);
+            assert_eq!(result.seeked_last, false);
+            for i in 0..LEN {
+                assert_eq!(result_data.inner.get_ref()[i], 1);
+            }
+            Ok(())
+        };
+
+        crate::tools::poll_once::poll_result_once(fut).expect("ok")
+    }
+}
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v4 1/1] pxar/extract: if possible create files sparesly
  2021-02-12 14:44 [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing Dominik Csapak
  2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 2/2] proxmox: add sparse_copy(_async) to tools::io Dominik Csapak
@ 2021-02-12 14:44 ` Dominik Csapak
  2 siblings, 0 replies; 4+ messages in thread
From: Dominik Csapak @ 2021-02-12 14:44 UTC (permalink / raw)
  To: pbs-devel

instead of filling them with zeroes

this fixes an issue where we could not restore a container with large
sparse files in the backup (e.g. a 10GiB sparse file in a container
with a 8GiB disk)

if the last operation of the copy was a seek, we need to truncate
the file to the correct size (seek beyond filesize does not change it)

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 src/pxar/extract.rs | 43 ++++++++++++++++++++++++++++++++++++-------
 1 file changed, 36 insertions(+), 7 deletions(-)

diff --git a/src/pxar/extract.rs b/src/pxar/extract.rs
index 0a61c885..80095e3f 100644
--- a/src/pxar/extract.rs
+++ b/src/pxar/extract.rs
@@ -18,7 +18,10 @@ use pxar::format::Device;
 use pxar::Metadata;
 
 use proxmox::c_result;
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox::tools::{
+    fs::{create_path, CreateOptions},
+    io::{sparse_copy, sparse_copy_async},
+};
 
 use crate::pxar::dir_stack::PxarDirStack;
 use crate::pxar::metadata;
@@ -406,10 +409,23 @@ impl Extractor {
         )
         .map_err(|err| format_err!("failed to apply initial flags: {}", err))?;
 
-        let extracted = io::copy(&mut *contents, &mut file)
+        let result = sparse_copy(&mut *contents, &mut file)
             .map_err(|err| format_err!("failed to copy file contents: {}", err))?;
-        if size != extracted {
-            bail!("extracted {} bytes of a file of {} bytes", extracted, size);
+
+        if size != result.written {
+            bail!(
+                "extracted {} bytes of a file of {} bytes",
+                result.written,
+                size
+            );
+        }
+
+        if result.seeked_last {
+            while match nix::unistd::ftruncate(file.as_raw_fd(), size as i64) {
+                Ok(_) => false,
+                Err(nix::Error::Sys(errno)) if errno == nix::errno::Errno::EINTR => true,
+                Err(err) => bail!("error setting file size: {}", err),
+            } {}
         }
 
         metadata::apply(
@@ -449,11 +465,24 @@ impl Extractor {
         )
         .map_err(|err| format_err!("failed to apply initial flags: {}", err))?;
 
-        let extracted = tokio::io::copy(&mut *contents, &mut file)
+        let result = sparse_copy_async(&mut *contents, &mut file)
             .await
             .map_err(|err| format_err!("failed to copy file contents: {}", err))?;
-        if size != extracted {
-            bail!("extracted {} bytes of a file of {} bytes", extracted, size);
+
+        if size != result.written {
+            bail!(
+                "extracted {} bytes of a file of {} bytes",
+                result.written,
+                size
+            );
+        }
+
+        if result.seeked_last {
+            while match nix::unistd::ftruncate(file.as_raw_fd(), size as i64) {
+                Ok(_) => false,
+                Err(nix::Error::Sys(errno)) if errno == nix::errno::Errno::EINTR => true,
+                Err(err) => bail!("error setting file size: {}", err),
+            } {}
         }
 
         metadata::apply(
-- 
2.20.1





^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-02-12 14:45 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-02-12 14:44 [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely Dominik Csapak
2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing Dominik Csapak
2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 2/2] proxmox: add sparse_copy(_async) to tools::io Dominik Csapak
2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox-backup v4 1/1] pxar/extract: if possible create files sparesly Dominik Csapak

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal