From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <d.csapak@proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by lists.proxmox.com (Postfix) with ESMTPS id 36A306AEF2
 for <pbs-devel@lists.proxmox.com>; Wed, 17 Feb 2021 14:13:56 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
 by firstgate.proxmox.com (Proxmox) with ESMTP id 2BD712249E
 for <pbs-devel@lists.proxmox.com>; Wed, 17 Feb 2021 14:13:26 +0100 (CET)
Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com
 [212.186.127.180])
 (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits)
 key-exchange X25519 server-signature RSA-PSS (2048 bits))
 (No client certificate requested)
 by firstgate.proxmox.com (Proxmox) with ESMTPS id 1873B2247F
 for <pbs-devel@lists.proxmox.com>; Wed, 17 Feb 2021 14:13:24 +0100 (CET)
Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1])
 by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D6B5F461E1
 for <pbs-devel@lists.proxmox.com>; Wed, 17 Feb 2021 14:13:23 +0100 (CET)
From: Dominik Csapak <d.csapak@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Wed, 17 Feb 2021 14:13:21 +0100
Message-Id: <20210217131322.9129-3-d.csapak@proxmox.com>
X-Mailer: git-send-email 2.20.1
In-Reply-To: <20210217131322.9129-1-d.csapak@proxmox.com>
References: <20210217131322.9129-1-d.csapak@proxmox.com>
MIME-Version: 1.0
Content-Transfer-Encoding: 8bit
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.219 Adjusted score from AWL reputation of From: address
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 RCVD_IN_DNSWL_MED        -2.3 Sender listed at https://www.dnswl.org/,
 medium trust
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
 URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See
 http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more
 information. [mod.rs]
Subject: [pbs-devel] [PATCH proxmox v5 2/2] proxmox: add sparse_copy(_async)
 to tools::io
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
X-List-Received-Date: Wed, 17 Feb 2021 13:13:56 -0000

this is able to seek the target instead of writing zeroes, which
generates sparse files where supported

also add tests for it

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
 proxmox/src/tools/io/mod.rs | 243 ++++++++++++++++++++++++++++++++++++
 1 file changed, 243 insertions(+)

diff --git a/proxmox/src/tools/io/mod.rs b/proxmox/src/tools/io/mod.rs
index 2e92ebb..c23e0f4 100644
--- a/proxmox/src/tools/io/mod.rs
+++ b/proxmox/src/tools/io/mod.rs
@@ -3,8 +3,251 @@
 //! The [`ReadExt`] trait provides additional operations for handling byte buffers for types
 //! implementing [`Read`](std::io::Read).
 
+use std::io::{self, ErrorKind, Read, Seek, SeekFrom, Write};
+
 mod read;
 pub use read::*;
 
 mod write;
 pub use write::*;
+
+fn buffer_is_zero(buf: &[u8]) -> bool {
+    !buf.chunks(128)
+        .map(|aa| aa.iter().fold(0, |a, b| a | b) != 0)
+        .any(|a| a)
+}
+
+/// Result of a sparse copy call
+/// contains the amount of written/seeked bytes
+/// and if the last operation was a seek
+pub struct SparseCopyResult {
+    pub written: u64,
+    pub seeked_last: bool,
+}
+
+/// copy similar to io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example use:
+/// ```
+/// # use std::io;
+/// # use proxmox::tools::io::sparse_copy;
+/// fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+///     R: io::Read,
+///     W: io::Write + io::Seek,
+/// {
+///     let res = sparse_copy(&mut reader, &mut writer)?;
+///
+///     println!("last part was seeked: {}", res.seeked_last);
+///     println!("written: {}", res.written);
+///
+///     Ok(())
+/// }
+/// ```
+pub fn sparse_copy<R: Read + ?Sized, W: Write + Seek + ?Sized>(
+    reader: &mut R,
+    writer: &mut W,
+) -> Result<SparseCopyResult, io::Error> {
+    let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+    let mut written = 0;
+    let mut seek_amount: i64 = 0;
+    let mut seeked_last = false;
+    loop {
+        buf.clear();
+        let len = match buf.read_from(reader) {
+            Ok(len) => len,
+            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+            Err(e) => return Err(e),
+        };
+
+        if len > 0 && buffer_is_zero(&buf[..]) {
+            seek_amount += len as i64;
+            continue;
+        }
+
+        if seek_amount > 0 {
+            writer.seek(SeekFrom::Current(seek_amount))?;
+            written += seek_amount as u64;
+            seek_amount = 0;
+            seeked_last = true;
+        }
+
+        if len > 0 {
+            writer.write_all(&buf[..])?;
+            written += len as u64;
+            seeked_last = false;
+        } else {
+            return Ok(SparseCopyResult {
+                written,
+                seeked_last,
+            });
+        }
+    }
+}
+
+#[cfg(feature = "tokio")]
+use tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt};
+
+#[cfg(feature = "tokio")]
+/// copy similar to tokio::io::copy, but seeks the target when encountering
+/// zero bytes instead of writing them
+///
+/// Example:
+/// ```no_run
+/// # use std::io;
+/// # use tokio::io::{AsyncRead, AsyncWrite, AsyncSeek};
+/// # use proxmox::tools::io::sparse_copy_async;
+/// async fn code<R, W>(mut reader: R, mut writer: W) -> io::Result<()>
+/// where
+///     R: AsyncRead + Unpin,
+///     W: AsyncWrite + AsyncSeek + Unpin,
+/// {
+///     let res = sparse_copy_async(&mut reader, &mut writer).await?;
+///
+///     println!("last part was seeked: {}", res.seeked_last);
+///     println!("written: {}", res.written);
+///
+///     Ok(())
+/// }
+/// ```
+pub async fn sparse_copy_async<R, W>(
+    reader: &mut R,
+    writer: &mut W,
+) -> Result<SparseCopyResult, io::Error>
+where
+    R: AsyncRead + Unpin,
+    W: AsyncWrite + AsyncSeek + Unpin,
+{
+    let mut buf = crate::tools::byte_buffer::ByteBuffer::with_capacity(4096);
+    let mut written = 0;
+    let mut seek_amount: i64 = 0;
+    let mut seeked_last = false;
+    loop {
+        buf.clear();
+        let len = match buf.read_from_async(reader).await {
+            Ok(len) => len,
+            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
+            Err(e) => return Err(e),
+        };
+
+        if len > 0 && buffer_is_zero(&buf[..]) {
+            seek_amount += len as i64;
+            continue;
+        }
+
+        if seek_amount > 0 {
+            writer.seek(SeekFrom::Current(seek_amount)).await?;
+            written += seek_amount as u64;
+            seek_amount = 0;
+            seeked_last = true;
+        }
+
+        if len > 0 {
+            writer.write_all(&buf[..]).await?;
+            written += len as u64;
+            seeked_last = false;
+        } else {
+            return Ok(SparseCopyResult {
+                written,
+                seeked_last,
+            });
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::io::Cursor;
+
+    use crate::tools::io::{sparse_copy, sparse_copy_async};
+    use crate::test::io::{AsyncBlockingReader, AsyncBlockingWriter};
+
+    const LEN: usize = 10000;
+
+    #[test]
+    fn test_sparse_copy() {
+        // test sparse
+        let mut test_data = Vec::new();
+        for _ in 0..LEN / 2 {
+            test_data.push(1u8);
+        }
+        for _ in 0..LEN / 2 {
+            test_data.push(0u8);
+        }
+        let mut test_data = Cursor::new(test_data);
+        let mut result_data = Cursor::new(vec![0; LEN]);
+
+        let result =
+            sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+        assert_eq!(result.written, LEN as u64);
+        assert_eq!(result.seeked_last, true);
+        for i in 0..LEN {
+            if i < LEN / 2 {
+                assert_eq!(result_data.get_ref()[i], 1);
+            } else {
+                assert_eq!(result_data.get_ref()[i], 0);
+            }
+        }
+
+        // test non sparse
+        let mut test_data = Cursor::new(vec![1; LEN]);
+        let mut result_data = Cursor::new(vec![0; LEN]);
+
+        let result =
+            sparse_copy(&mut test_data, &mut result_data).expect("error during sparse copy");
+        assert_eq!(result.written, LEN as u64);
+        assert_eq!(result.seeked_last, false);
+        for i in 0..LEN {
+            assert_eq!(result_data.get_ref()[i], 1);
+        }
+    }
+
+
+    #[test]
+    fn test_sparse_copy_async() {
+        let fut = async {
+            // test sparse
+            let mut test_data = Vec::new();
+            for _ in 0..LEN / 2 {
+                test_data.push(1u8);
+            }
+            for _ in 0..LEN / 2 {
+                test_data.push(0u8);
+            }
+            let mut test_data = AsyncBlockingReader::new(Cursor::new(test_data));
+            let mut result_data = AsyncBlockingWriter::new(Cursor::new(vec![0; LEN]));
+
+            let result = sparse_copy_async(&mut test_data, &mut result_data)
+                .await
+                .expect("error during sparse copy");
+
+            assert_eq!(result.written, LEN as u64);
+            assert_eq!(result.seeked_last, true);
+            for i in 0..LEN {
+                if i < LEN / 2 {
+                    assert_eq!(result_data.inner().get_ref()[i], 1);
+                } else {
+                    assert_eq!(result_data.inner().get_ref()[i], 0);
+                }
+            }
+
+            // test non sparse
+            let mut test_data = AsyncBlockingReader::new(Cursor::new(vec![1; LEN]));
+            let mut result_data = AsyncBlockingWriter::new(Cursor::new(vec![0; LEN]));
+
+            let result = sparse_copy_async(&mut test_data, &mut result_data)
+                .await
+                .expect("error during sparse copy");
+
+            assert_eq!(result.written, LEN as u64);
+            assert_eq!(result.seeked_last, false);
+            for i in 0..LEN {
+                assert_eq!(result_data.inner().get_ref()[i], 1);
+            }
+            Ok(())
+        };
+
+        crate::test::task::poll_result_once(fut).expect("ok")
+    }
+}
-- 
2.20.1