From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox v4 2/2] proxmox: add sparse_copy(_async) to tools::io
Date: Fri, 12 Feb 2021 15:44:32 +0100 [thread overview]
Message-ID: <20210212144433.30668-3-d.csapak@proxmox.com> (raw)
In-Reply-To: <20210212144433.30668-1-d.csapak@proxmox.com>
this is able to seek the target instead of writing zeroes, which
generates sparse files where supported
also add tests for it
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
proxmox/src/tools/io/mod.rs | 320 ++++++++++++++++++++++++++++++++++++
1 file changed, 320 insertions(+)
diff --git a/proxmox/src/tools/io/mod.rs b/proxmox/src/tools/io/mod.rs
index 2e92ebb..b935921 100644
--- a/proxmox/src/tools/io/mod.rs
+++ b/proxmox/src/tools/io/mod.rs
@@ -3,8 +3,328 @@
//! The [`ReadExt`] trait provides additional operations for handling byte buffers for types
//! implementing [`Read`](std::io::Read).
+use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
+
mod read;
pub use read::*;
mod write;
pub use write::*;
+
+fn buffer_is_zero(buf: &[u8]) -> bool {
+ !buf.chunks(128)
+ .map(|aa| aa.iter().fold(0, |a, b| a | b) != 0)
+ .any(|a| a)
+}
+
+/// Result of a sparse copy call
+/// contains the amount of written/seeked bytes
+/// and if the last operation was a seek
+pub struct SparseCopyResult {
+ pub written: u64,
+ pub seeked_last: bool,
+}
+
+/// copy similar to io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example use:
+/// ```
+/// # use std::io;
+/// # use proxmox::tools::io::sparse_copy;
+/// fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+/// R: io::Read,
+/// W: io::Write + io::Seek,
+/// {
+/// let res = sparse_copy(&mut reader, &mut writer)?;
+///
+/// println!("last part was seeked: {}", res.seeked_last);
+/// println!("written: {}", res.written);
+///
+/// Ok(())
+/// }
+/// ```
+pub fn sparse_copy<R: Read + ?Sized, W: Write + Seek + ?Sized>(
+ reader: &mut R,
+ writer: &mut W,
+) -> Result<SparseCopyResult, io::Error> {
+ let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+ let mut written = 0;
+ let mut seek_amount: i64 = 0;
+ let mut seeked_last = false;
+ loop {
+ buf.clear();
+ let len = match buf.read_from(reader) {
+ Ok(len) => len,
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+ Err(e) => return Err(e),
+ };
+
+ if len > 0 && buffer_is_zero(&buf[..]) {
+ seek_amount += len as i64;
+ continue;
+ }
+
+ if seek_amount > 0 {
+ writer.seek(SeekFrom::Current(seek_amount))?;
+ written += seek_amount as u64;
+ seek_amount = 0;
+ seeked_last = true;
+ }
+
+ if len > 0 {
+ writer.write_all(&buf[..])?;
+ written += len as u64;
+ seeked_last = false;
+ } else {
+ return Ok(SparseCopyResult {
+ written,
+ seeked_last,
+ });
+ }
+ }
+}
+
+#[cfg(feature = "tokio")]
+use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
+
+#[cfg(feature = "tokio")]
+/// copy similar to tokio::io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example:
+/// ```no_run
+/// # use std::io;
+/// # use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek};
+/// # use proxmox::tools::io::sparse_copy_async;
+/// async fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+/// R: AsyncRead + Unpin,
+/// W: AsyncWrite + AsyncSeek + Unpin,
+/// {
+/// let res = sparse_copy_async(&mut reader, &mut writer).await?;
+///
+/// println!("last part was seeked: {}", res.seeked_last);
+/// println!("written: {}", res.written);
+///
+/// Ok(())
+/// }
+/// ```
+pub async fn sparse_copy_async<R, W>(
+ reader: &mut R,
+ writer: &mut W,
+) -> Result<SparseCopyResult, io::Error>
+where
+ R: AsyncRead + Unpin,
+ W: AsyncWrite + AsyncSeek + Unpin,
+{
+ let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+ let mut written = 0;
+ let mut seek_amount: i64 = 0;
+ let mut seeked_last = false;
+ loop {
+ buf.clear();
+ let len = match buf.read_from_async(reader).await {
+ Ok(len) => len,
+ Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+ Err(e) => return Err(e),
+ };
+
+ if len > 0 && buffer_is_zero(&buf[..]) {
+ seek_amount += len as i64;
+ continue;
+ }
+
+ if seek_amount > 0 {
+ writer.seek(SeekFrom::Current(seek_amount)).await?;
+ written += seek_amount as u64;
+ seek_amount = 0;
+ seeked_last = true;
+ }
+
+ if len > 0 {
+ writer.write_all(&buf[..]).await?;
+ written += len as u64;
+ seeked_last = false;
+ } else {
+ return Ok(SparseCopyResult {
+ written,
+ seeked_last,
+ });
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::io::Cursor;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
+
+ use crate::tools::io::{sparse_copy, sparse_copy_async};
+
+ const LEN: usize = 10000;
+
+ #[test]
+ fn test_sparse_copy() {
+ // test sparse
+ let mut test_data = Vec::new();
+ for _ in 0..LEN / 2 {
+ test_data.push(1u8);
+ }
+ for _ in 0..LEN / 2 {
+ test_data.push(0u8);
+ }
+ let mut test_data = Cursor::new(test_data);
+ let mut result_data = Cursor::new(vec![0; LEN]);
+
+ let result =
+ sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, true);
+ for i in 0..LEN {
+ if i < LEN / 2 {
+ assert_eq!(result_data.get_ref()[i], 1);
+ } else {
+ assert_eq!(result_data.get_ref()[i], 0);
+ }
+ }
+
+ // test non sparse
+ let mut test_data = Cursor::new(vec![1; LEN]);
+ let mut result_data = Cursor::new(vec![0; LEN]);
+
+ let result =
+ sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, false);
+ for i in 0..LEN {
+ assert_eq!(result_data.get_ref()[i], 1);
+ }
+ }
+
+ struct DummyAsyncReader<R> {
+ inner: R,
+ }
+
+ struct DummyAsyncWriter<W> {
+ inner: W,
+ seek_pos: u64,
+ }
+
+ impl<R: std::io::Read + Unpin> AsyncRead for DummyAsyncReader<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<std::io::Result<()>> {
+ let this = Pin::get_mut(self);
+ let mut read_buf = buf.initialize_unfilled();
+ match this.inner.read(&mut read_buf) {
+ Ok(len) => {
+ buf.advance(len);
+ Poll::Ready(Ok(()))
+ }
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+ }
+
+ impl<R: std::io::Write + Unpin> AsyncWrite for DummyAsyncWriter<R> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::io::Result<usize>> {
+ let this = Pin::get_mut(self);
+ match this.inner.write(buf) {
+ Ok(len) => Poll::Ready(Ok(len)),
+ Err(err) => Poll::Ready(Err(err)),
+ }
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ impl<R: std::io::Seek + Unpin> AsyncSeek for DummyAsyncWriter<R> {
+ fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+ let this = Pin::get_mut(self);
+ this.seek_pos = this.inner.seek(position)?;
+ Ok(())
+ }
+
+ fn poll_complete(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<std::io::Result<u64>> {
+ let this = Pin::get_mut(self);
+ Poll::Ready(Ok(this.seek_pos))
+ }
+ }
+
+ #[test]
+ fn test_sparse_copy_async() {
+ let fut = async {
+ // test sparse
+ let mut test_data = Vec::new();
+ for _ in 0..LEN / 2 {
+ test_data.push(1u8);
+ }
+ for _ in 0..LEN / 2 {
+ test_data.push(0u8);
+ }
+ let mut test_data = DummyAsyncReader {
+ inner: Cursor::new(test_data),
+ };
+ let mut result_data = DummyAsyncWriter {
+ inner: Cursor::new(vec![0; LEN]),
+ seek_pos: 0,
+ };
+
+ let result = sparse_copy_async(&mut test_data, &mut result_data)
+ .await
+ .expect("error during sparse copy");
+
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, true);
+ for i in 0..LEN {
+ if i < LEN / 2 {
+ assert_eq!(result_data.inner.get_ref()[i], 1);
+ } else {
+ assert_eq!(result_data.inner.get_ref()[i], 0);
+ }
+ }
+
+ // test non sparse
+ let mut test_data = DummyAsyncReader {
+ inner: Cursor::new(vec![1; LEN]),
+ };
+ let mut result_data = DummyAsyncWriter {
+ inner: Cursor::new(vec![0; LEN]),
+ seek_pos: 0,
+ };
+
+ let result = sparse_copy_async(&mut test_data, &mut result_data)
+ .await
+ .expect("error during sparse copy");
+
+ assert_eq!(result.written, LEN as u64);
+ assert_eq!(result.seeked_last, false);
+ for i in 0..LEN {
+ assert_eq!(result_data.inner.get_ref()[i], 1);
+ }
+ Ok(())
+ };
+
+ crate::tools::poll_once::poll_result_once(fut).expect("ok")
+ }
+}
--
2.20.1
next prev parent reply other threads:[~2021-02-12 14:44 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-02-12 14:44 [pbs-devel] [PATCH proxmox v4 0/2] restore files from pxar sparsely Dominik Csapak
2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox v4 1/2] proxmox/tools: add poll_once module for testing Dominik Csapak
2021-02-12 14:44 ` Dominik Csapak [this message]
2021-02-12 14:44 ` [pbs-devel] [PATCH proxmox-backup v4 1/1] pxar/extract: if possible create files sparesly Dominik Csapak
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210212144433.30668-3-d.csapak@proxmox.com \
--to=d.csapak@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal