* [pbs-devel] [PATCH proxmox v5 1/2] proxmox: add test/{io, task} modules
2021-02-17 13:13 [pbs-devel] [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Dominik Csapak
@ 2021-02-17 13:13 ` Dominik Csapak
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox v5 2/2] proxmox: add sparse_copy(_async) to tools::io Dominik Csapak
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2021-02-17 13:13 UTC (permalink / raw)
To: pbs-devel
contains:
* AsyncBlocking{Reader,Writer} for dummy async code
by wrapping a 'standard reader/writer'
* poll_result_once for pulling a future once (copied from pxar)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/lib.rs | 3 ++
proxmox/src/test/io.rs | 94 ++++++++++++++++++++++++++++++++++++++++
proxmox/src/test/mod.rs | 2 +
proxmox/src/test/task.rs | 32 ++++++++++++++
4 files changed, 131 insertions(+)
create mode 100644 proxmox/src/test/io.rs
create mode 100644 proxmox/src/test/mod.rs
create mode 100644 proxmox/src/test/task.rs
diff --git a/proxmox/src/lib.rs b/proxmox/src/lib.rs
index b74b399..6e95906 100644
--- a/proxmox/src/lib.rs
+++ b/proxmox/src/lib.rs
@@ -8,6 +8,9 @@ pub mod api;
pub mod sys;
pub mod tools;
+#[cfg(test)]
+pub mod test;
+
/// An identity (nop) macro. Used by the `#[sortable]` proc macro.
#[cfg(feature = "sortable-macro")]
#[macro_export]
diff --git a/proxmox/src/test/io.rs b/proxmox/src/test/io.rs
new file mode 100644
index 0000000..919aac1
--- /dev/null
+++ b/proxmox/src/test/io.rs
@@ -0,0 +1,94 @@
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
+
+pub struct AsyncBlockingReader<R> {
+ inner: R,
+}
+
+impl<W> AsyncBlockingReader<W> {
+ pub fn new(inner: W) -> Self {
+ Self {
+ inner
+ }
+ }
+
+ pub fn inner(&self) -> &W {
+ &self.inner
+ }
+}
+
+pub struct AsyncBlockingWriter<W> {
+ inner: W,
+ seek_pos: u64,
+}
+
+impl<W> AsyncBlockingWriter<W> {
+ pub fn new(inner: W) -> Self {
+ Self {
+ inner,
+ seek_pos: 0,
+ }
+ }
+
+ pub fn inner(&self) -> &W {
+ &self.inner
+ }
+}
+
+impl<R: std::io::Read + Unpin> AsyncRead for AsyncBlockingReader<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ let this = Pin::get_mut(self);
+ let mut read_buf = buf.initialize_unfilled();
+ match this.inner.read(&mut read_buf) {
+ Ok(len) => {
+ buf.advance(len);
+ Poll::Ready(Ok(()))
+ }
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+}
+
+impl<R: std::io::Write + Unpin> AsyncWrite for AsyncBlockingWriter<R> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::io::Result<usize>> {
+ let this = Pin::get_mut(self);
+ match this.inner.write(buf) {
+ Ok(len) => Poll::Ready(Ok(len)),
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<R: std::io::Seek + Unpin> AsyncSeek for AsyncBlockingWriter<R> {
+ fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+ let this = Pin::get_mut(self);
+ this.seek_pos = this.inner.seek(position)?;
+ Ok(())
+ }
+
+ fn poll_complete(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<std::io::Result<u64>> {
+ let this = Pin::get_mut(self);
+ Poll::Ready(Ok(this.seek_pos))
+ }
+}
diff --git a/proxmox/src/test/mod.rs b/proxmox/src/test/mod.rs
new file mode 100644
index 0000000..82ac3e2
--- /dev/null
+++ b/proxmox/src/test/mod.rs
@@ -0,0 +1,2 @@
+pub mod io;
+pub mod task;
diff --git a/proxmox/src/test/task.rs b/proxmox/src/test/task.rs
new file mode 100644
index 0000000..4f5eca6
--- /dev/null
+++ b/proxmox/src/test/task.rs
@@ -0,0 +1,32 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pub fn poll_result_once<T, R>(mut fut: T) -> std::io::Result<R>
+where
+ T: Future<Output = std::io::Result<R>>,
+{
+ let waker = std::task::RawWaker::new(std::ptr::null(), &WAKER_VTABLE);
+ let waker = unsafe { std::task::Waker::from_raw(waker) };
+ let mut cx = Context::from_waker(&waker);
+ unsafe {
+ match Pin::new_unchecked(&mut fut).poll(&mut cx) {
+ Poll::Pending => Err(crate::sys::error::io_err_other(
+ "got Poll::Pending synchronous context",
+ )),
+ Poll::Ready(r) => r,
+ }
+ }
+}
+
+const WAKER_VTABLE: std::task::RawWakerVTable =
+std::task::RawWakerVTable::new(forbid_clone, forbid_wake, forbid_wake, ignore_drop);
+
+unsafe fn forbid_clone(_: *const ()) -> std::task::RawWaker {
+ panic!("tried to clone waker for synchronous task");
+}
+
+unsafe fn forbid_wake(_: *const ()) {
+ panic!("tried to wake synchronous task");
+}
+unsafe fn ignore_drop(_: *const ()) {}
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH proxmox v5 2/2] proxmox: add sparse_copy(_async) to tools::io
2021-02-17 13:13 [pbs-devel] [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Dominik Csapak
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox v5 1/2] proxmox: add test/{io, task} modules Dominik Csapak
@ 2021-02-17 13:13 ` Dominik Csapak
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox-backup v5 1/1] pxar/extract: if possible create files sparesly Dominik Csapak
2021-02-23 14:08 ` [pbs-devel] applied: [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2021-02-17 13:13 UTC (permalink / raw)
To: pbs-devel
this is able to seek the target instead of writing zeroes, which
generates sparse files where supported
also add tests for it
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/io/mod.rs | 243 ++++++++++++++++++++++++++++++++++++
1 file changed, 243 insertions(+)
diff --git a/proxmox/src/tools/io/mod.rs b/proxmox/src/tools/io/mod.rs
index 2e92ebb..c23e0f4 100644
--- a/proxmox/src/tools/io/mod.rs
+++ b/proxmox/src/tools/io/mod.rs
@@ -3,8 +3,251 @@
//! The [`ReadExt`] trait provides additional operations for handling byte buffers for types
//! implementing [`Read`](std::io::Read).
+use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
+
mod read;
pub use read::*;
mod write;
pub use write::*;
+
+fn buffer_is_zero(buf: &[u8]) -> bool {
+ !buf.chunks(128)
+ .map(|aa| aa.iter().fold(0, |a, b| a | b) != 0)
+ .any(|a| a)
+}
+
+/// Result of a sparse copy call
+/// contains the amount of written/seeked bytes
+/// and if the last operation was a seek
+pub struct SparseCopyResult {
+ pub written: u64,
+ pub seeked_last: bool,
+}
+
+/// copy similar to io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example use:
+/// ```
+/// # use std::io;
+/// # use proxmox::tools::io::sparse_copy;
+/// fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+/// R: io::Read,
+/// W: io::Write + io::Seek,
+/// {
+/// let res = sparse_copy(&mut reader, &mut writer)?;
+///
+/// println!("last part was seeked: {}", res.seeked_last);
+/// println!("written: {}", res.written);
+///
+/// Ok(())
+/// }
+/// ```
+pub fn sparse_copy<R: Read + ?Sized, W: Write + Seek + ?Sized>(
+ reader: &mut R,
+ writer: &mut W,
+) -> Result<SparseCopyResult, io::Error> {
+ let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+ let mut written = 0;
+ let mut seek_amount: i64 = 0;
+ let mut seeked_last = false;
+ loop {
+ buf.clear();
+ let len = match buf.read_from(reader) {
+ Ok(len) => len,
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+ Err(e) => return Err(e),
+ };
+
+ if len > 0 && buffer_is_zero(&buf[..]) {
+ seek_amount += len as i64;
+ continue;
+ }
+
+ if seek_amount > 0 {
+ writer.seek(SeekFrom::Current(seek_amount))?;
+ written += seek_amount as u64;
+ seek_amount = 0;
+ seeked_last = true;
+ }
+
+ if len > 0 {
+ writer.write_all(&buf[..])?;
+ written += len as u64;
+ seeked_last = false;
+ } else {
+ return Ok(SparseCopyResult {
+ written,
+ seeked_last,
+ });
+ }
+ }
+}
+
+#[cfg(feature = "tokio")]
+use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
+
+#[cfg(feature = "tokio")]
+/// copy similar to tokio::io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example:
+/// ```no_run
+/// # use std::io;
+/// # use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek};
+/// # use proxmox::tools::io::sparse_copy_async;
+/// async fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+/// R: AsyncRead + Unpin,
+/// W: AsyncWrite + AsyncSeek + Unpin,
+/// {
+/// let res = sparse_copy_async(&mut reader, &mut writer).await?;
+///
+/// println!("last part was seeked: {}", res.seeked_last);
+/// println!("written: {}", res.written);
+///
+/// Ok(())
+/// }
+/// ```
+pub async fn sparse_copy_async<R, W>(
+ reader: &mut R,
+ writer: &mut W,
+) -> Result<SparseCopyResult, io::Error>
+where
+ R: AsyncRead + Unpin,
+ W: AsyncWrite + AsyncSeek + Unpin,
+{
+ let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+ let mut written = 0;
+ let mut seek_amount: i64 = 0;
+ let mut seeked_last = false;
+ loop {
+ buf.clear();
+ let len = match buf.read_from_async(reader).await {
+ Ok(len) => len,
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+ Err(e) => return Err(e),
+ };
+
+ if len > 0 && buffer_is_zero(&buf[..]) {
+ seek_amount += len as i64;
+ continue;
+ }
+
+ if seek_amount > 0 {
+ writer.seek(SeekFrom::Current(seek_amount)).await?;
+ written += seek_amount as u64;
+ seek_amount = 0;
+ seeked_last = true;
+ }
+
+ if len > 0 {
+ writer.write_all(&buf[..]).await?;
+ written += len as u64;
+ seeked_last = false;
+ } else {
+ return Ok(SparseCopyResult {
+ written,
+ seeked_last,
+ });
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::io::Cursor;
+
+ use crate::tools::io::{sparse_copy, sparse_copy_async};
+ use crate::test::io::{AsyncBlockingReader, AsyncBlockingWriter};
+
+ const LEN: usize = 10000;
+
+ #[test]
+ fn test_sparse_copy() {
+ // test sparse
+ let mut test_data = Vec::new();
+ for _ in 0..LEN / 2 {
+ test_data.push(1u8);
+ }
+ for _ in 0..LEN / 2 {
+ test_data.push(0u8);
+ }
+ let mut test_data = Cursor::new(test_data);
+ let mut result_data = Cursor::new(vec![0; LEN]);
+
+ let result =
+ sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, true);
+ for i in 0..LEN {
+ if i < LEN / 2 {
+ assert_eq!(result_data.get_ref()[i], 1);
+ } else {
+ assert_eq!(result_data.get_ref()[i], 0);
+ }
+ }
+
+ // test non sparse
+ let mut test_data = Cursor::new(vec![1; LEN]);
+ let mut result_data = Cursor::new(vec![0; LEN]);
+
+ let result =
+ sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, false);
+ for i in 0..LEN {
+ assert_eq!(result_data.get_ref()[i], 1);
+ }
+ }
+
+
+ #[test]
+ fn test_sparse_copy_async() {
+ let fut = async {
+ // test sparse
+ let mut test_data = Vec::new();
+ for _ in 0..LEN / 2 {
+ test_data.push(1u8);
+ }
+ for _ in 0..LEN / 2 {
+ test_data.push(0u8);
+ }
+ let mut test_data = AsyncBlockingReader::new(Cursor::new(test_data));
+ let mut result_data = AsyncBlockingWriter::new(Cursor::new(vec![0; LEN]));
+
+ let result = sparse_copy_async(&mut test_data, &mut result_data)
+ .await
+ .expect("error during sparse copy");
+
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, true);
+ for i in 0..LEN {
+ if i < LEN / 2 {
+ assert_eq!(result_data.inner().get_ref()[i], 1);
+ } else {
+ assert_eq!(result_data.inner().get_ref()[i], 0);
+ }
+ }
+
+ // test non sparse
+ let mut test_data = AsyncBlockingReader::new(Cursor::new(vec![1; LEN]));
+ let mut result_data = AsyncBlockingWriter::new(Cursor::new(vec![0; LEN]));
+
+ let result = sparse_copy_async(&mut test_data, &mut result_data)
+ .await
+ .expect("error during sparse copy");
+
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, false);
+ for i in 0..LEN {
+ assert_eq!(result_data.inner().get_ref()[i], 1);
+ }
+ Ok(())
+ };
+
+ crate::test::task::poll_result_once(fut).expect("ok")
+ }
+}
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] [PATCH proxmox-backup v5 1/1] pxar/extract: if possible create files sparesly
2021-02-17 13:13 [pbs-devel] [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Dominik Csapak
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox v5 1/2] proxmox: add test/{io, task} modules Dominik Csapak
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox v5 2/2] proxmox: add sparse_copy(_async) to tools::io Dominik Csapak
@ 2021-02-17 13:13 ` Dominik Csapak
2021-02-23 14:08 ` [pbs-devel] applied: [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Dominik Csapak @ 2021-02-17 13:13 UTC (permalink / raw)
To: pbs-devel
instead of filling them with zeroes
this fixes an issue where we could not restore a container with large
sparse files in the backup (e.g. a 10GiB sparse file in a container
with a 8GiB disk)
if the last operation of the copy was a seek, we need to truncate
the file to the correct size (seek beyond filesize does not change it)
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
| 43 ++++++++++++++++++++++++++++++++++++-------
1 file changed, 36 insertions(+), 7 deletions(-)
--git a/src/pxar/extract.rs b/src/pxar/extract.rs
index b673b4b8..952e2d20 100644
--- a/src/pxar/extract.rs
+++ b/src/pxar/extract.rs
@@ -21,7 +21,10 @@ use pxar::Metadata;
use pxar::accessor::aio::{Accessor, FileContents, FileEntry};
use proxmox::c_result;
-use proxmox::tools::fs::{create_path, CreateOptions};
+use proxmox::tools::{
+ fs::{create_path, CreateOptions},
+ io::{sparse_copy, sparse_copy_async},
+};
use crate::pxar::dir_stack::PxarDirStack;
use crate::pxar::metadata;
@@ -411,10 +414,23 @@ impl Extractor {
)
.map_err(|err| format_err!("failed to apply initial flags: {}", err))?;
- let extracted = io::copy(&mut *contents, &mut file)
+ let result = sparse_copy(&mut *contents, &mut file)
.map_err(|err| format_err!("failed to copy file contents: {}", err))?;
- if size != extracted {
- bail!("extracted {} bytes of a file of {} bytes", extracted, size);
+
+ if size != result.written {
+ bail!(
+ "extracted {} bytes of a file of {} bytes",
+ result.written,
+ size
+ );
+ }
+
+ if result.seeked_last {
+ while match nix::unistd::ftruncate(file.as_raw_fd(), size as i64) {
+ Ok(_) => false,
+ Err(nix::Error::Sys(errno)) if errno == nix::errno::Errno::EINTR => true,
+ Err(err) => bail!("error setting file size: {}", err),
+ } {}
}
metadata::apply(
@@ -454,11 +470,24 @@ impl Extractor {
)
.map_err(|err| format_err!("failed to apply initial flags: {}", err))?;
- let extracted = tokio::io::copy(&mut *contents, &mut file)
+ let result = sparse_copy_async(&mut *contents, &mut file)
.await
.map_err(|err| format_err!("failed to copy file contents: {}", err))?;
- if size != extracted {
- bail!("extracted {} bytes of a file of {} bytes", extracted, size);
+
+ if size != result.written {
+ bail!(
+ "extracted {} bytes of a file of {} bytes",
+ result.written,
+ size
+ );
+ }
+
+ if result.seeked_last {
+ while match nix::unistd::ftruncate(file.as_raw_fd(), size as i64) {
+ Ok(_) => false,
+ Err(nix::Error::Sys(errno)) if errno == nix::errno::Errno::EINTR => true,
+ Err(err) => bail!("error setting file size: {}", err),
+ } {}
}
metadata::apply(
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [pbs-devel] applied: [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely
2021-02-17 13:13 [pbs-devel] [PATCH proxmox/proxmox-backup v5] restore files from pxar sparsely Dominik Csapak
` (2 preceding siblings ...)
2021-02-17 13:13 ` [pbs-devel] [PATCH proxmox-backup v5 1/1] pxar/extract: if possible create files sparesly Dominik Csapak
@ 2021-02-23 14:08 ` Wolfgang Bumiller
3 siblings, 0 replies; 5+ messages in thread
From: Wolfgang Bumiller @ 2021-02-23 14:08 UTC (permalink / raw)
To: Dominik Csapak; +Cc: pbs-devel
applied all patches, bumped proxmox crate
On Wed, Feb 17, 2021 at 02:13:19PM +0100, Dominik Csapak wrote:
> to be able to restore containers with big sparse files
>
> ideally we would save hole information directly in the pxar archive
> and not even use zero chunks, so that we can have smaller
> pxar archives, and accurately restore sparse files like they were before
>
> for now, restore all files sparsely
>
> changes from v4:
> * moved proxmox test code into a toplevel testmodule that is behind
> a #[cfg(test)] and adapted the tests
>
> changes from v3:
> * collect subsequent holes, so that we only seek one time per hole
> * add tests for sparse_copy(_async)
> * also return if the last operation was a seek and only truncate then
> * create buffer with explicit 4096 bytes
> * use rustfmt
> * adds a patch to add poll_once to proxmox module (for async testing)
>
> changes from v2:
> * always sparse copy and truncate after
>
> changes from RFC:
> * drop the zero module of proxmox, rust can generate fast code by itself
>
> proxmox:
>
> Dominik Csapak (2):
> proxmox: add test/{io,task} modules
> proxmox: add sparse_copy(_async) to tools::io
>
> proxmox/src/lib.rs | 3 +
> proxmox/src/test/io.rs | 94 ++++++++++++++
> proxmox/src/test/mod.rs | 2 +
> proxmox/src/test/task.rs | 32 +++++
> proxmox/src/tools/io/mod.rs | 243 ++++++++++++++++++++++++++++++++++++
> 5 files changed, 374 insertions(+)
> create mode 100644 proxmox/src/test/io.rs
> create mode 100644 proxmox/src/test/mod.rs
> create mode 100644 proxmox/src/test/task.rs
>
> proxmox-backup:
>
> Dominik Csapak (1):
> pxar/extract: if possible create files sparesly
>
> src/pxar/extract.rs | 43 ++++++++++++++++++++++++++++++++++++-------
> 1 file changed, 36 insertions(+), 7 deletions(-)
>
> --
> 2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread