* [pbs-devel] [PATCH proxmox-backup 1/2] datastore: data blob: increase compression throughput
@ 2024-07-23 10:10 Dominik Csapak
2024-07-23 10:10 ` [pbs-devel] [PATCH proxmox-backup 2/2] remove data blob writer Dominik Csapak
0 siblings, 1 reply; 2+ messages in thread
From: Dominik Csapak @ 2024-07-23 10:10 UTC (permalink / raw)
To: pbs-devel
by not using `zstd::stream::copy_encode`, because that has an allocation
pattern that reduces throughput if the target/source storage and the
network are faster than the chunk creation.
instead use `zstd::bulk::compress_to_buffer` which shouldn't to any big
allocations, since we provide the target buffer.
To handle the case that the target buffer is too small, we now ignore
all zstd error and continue with the unencrypted data, logging the error
except if the target buffer is too small.
For now, we have to parse the error string for that, as `zstd` maps all
errors as `io::ErrorKind::Other`. Until that gets changed, there is no
other way to differentiate between different kind of errors.
In my local benchmarks from tmpfs to tmpfs on localhost, where i
previously maxed out at ~450MiB/s i know get ~625MiB/s throughput.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
Note: if we want a different behavior for the errors, that's also ok
with me, but zstd errors should be rare i guess (except the target
buffer one) and in that case I find it better to continue with
uncompressed data. For the case that it was a transient error,
the next upload of the chunk will replace the uncompressed one
if it's smaller anyway.
pbs-datastore/src/data_blob.rs | 31 +++++++++++++++++++++----------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git a/pbs-datastore/src/data_blob.rs b/pbs-datastore/src/data_blob.rs
index a7a55fb7..92242076 100644
--- a/pbs-datastore/src/data_blob.rs
+++ b/pbs-datastore/src/data_blob.rs
@@ -136,7 +136,8 @@ impl DataBlob {
DataBlob { raw_data }
} else {
- let max_data_len = data.len() + std::mem::size_of::<DataBlobHeader>();
+ let header_len = std::mem::size_of::<DataBlobHeader>();
+ let max_data_len = data.len() + header_len;
if compress {
let mut comp_data = Vec::with_capacity(max_data_len);
@@ -147,15 +148,25 @@ impl DataBlob {
unsafe {
comp_data.write_le_value(head)?;
}
-
- zstd::stream::copy_encode(data, &mut comp_data, 1)?;
-
- if comp_data.len() < max_data_len {
- let mut blob = DataBlob {
- raw_data: comp_data,
- };
- blob.set_crc(blob.compute_crc());
- return Ok(blob);
+ comp_data.resize(max_data_len, 0u8);
+
+ match zstd::bulk::compress_to_buffer(data, &mut comp_data[header_len..], 1) {
+ Ok(size) if size <= data.len() => {
+ comp_data.resize(header_len + size, 0u8);
+ let mut blob = DataBlob {
+ raw_data: comp_data,
+ };
+ blob.set_crc(blob.compute_crc());
+ return Ok(blob);
+ }
+ // if size is bigger than the data, or any error is returned, continue with non
+ // compressed archive but log all errors beside buffer too small
+ Ok(_) => {}
+ Err(err) => {
+ if !err.to_string().contains("Destination buffer is too small") {
+ log::error!("zstd compression error: {err}");
+ }
+ }
}
}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 2+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/2] remove data blob writer
2024-07-23 10:10 [pbs-devel] [PATCH proxmox-backup 1/2] datastore: data blob: increase compression throughput Dominik Csapak
@ 2024-07-23 10:10 ` Dominik Csapak
0 siblings, 0 replies; 2+ messages in thread
From: Dominik Csapak @ 2024-07-23 10:10 UTC (permalink / raw)
To: pbs-devel
this is leftover code that is not currently used outside of tests.
Should we need it again, we can just revert this commit.
Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
---
pbs-datastore/src/data_blob_writer.rs | 212 --------------------------
pbs-datastore/src/lib.rs | 2 -
tests/blob_writer.rs | 105 -------------
3 files changed, 319 deletions(-)
delete mode 100644 pbs-datastore/src/data_blob_writer.rs
delete mode 100644 tests/blob_writer.rs
diff --git a/pbs-datastore/src/data_blob_writer.rs b/pbs-datastore/src/data_blob_writer.rs
deleted file mode 100644
index 30d9645f..00000000
--- a/pbs-datastore/src/data_blob_writer.rs
+++ /dev/null
@@ -1,212 +0,0 @@
-use std::io::{Seek, SeekFrom, Write};
-use std::sync::Arc;
-
-use anyhow::Error;
-
-use proxmox_io::WriteExt;
-
-use pbs_tools::crypt_config::CryptConfig;
-
-use crate::checksum_writer::ChecksumWriter;
-use crate::crypt_writer::CryptWriter;
-use crate::file_formats::{self, DataBlobHeader, EncryptedDataBlobHeader};
-
-enum BlobWriterState<'writer, W: Write> {
- Uncompressed {
- csum_writer: ChecksumWriter<W>,
- },
- Compressed {
- compr: zstd::stream::write::Encoder<'writer, ChecksumWriter<W>>,
- },
- Encrypted {
- crypt_writer: CryptWriter<ChecksumWriter<W>>,
- },
- EncryptedCompressed {
- compr: zstd::stream::write::Encoder<'writer, CryptWriter<ChecksumWriter<W>>>,
- },
-}
-
-/// Data blob writer
-pub struct DataBlobWriter<'writer, W: Write> {
- state: BlobWriterState<'writer, W>,
-}
-
-impl<W: Write + Seek> DataBlobWriter<'_, W> {
- pub fn new_uncompressed(mut writer: W) -> Result<Self, Error> {
- writer.seek(SeekFrom::Start(0))?;
- let head = DataBlobHeader {
- magic: file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0,
- crc: [0; 4],
- };
- unsafe {
- writer.write_le_value(head)?;
- }
- let csum_writer = ChecksumWriter::new(writer, None);
- Ok(Self {
- state: BlobWriterState::Uncompressed { csum_writer },
- })
- }
-
- pub fn new_compressed(mut writer: W) -> Result<Self, Error> {
- writer.seek(SeekFrom::Start(0))?;
- let head = DataBlobHeader {
- magic: file_formats::COMPRESSED_BLOB_MAGIC_1_0,
- crc: [0; 4],
- };
- unsafe {
- writer.write_le_value(head)?;
- }
- let csum_writer = ChecksumWriter::new(writer, None);
- let compr = zstd::stream::write::Encoder::new(csum_writer, 1)?;
- Ok(Self {
- state: BlobWriterState::Compressed { compr },
- })
- }
-
- pub fn new_encrypted(mut writer: W, config: Arc<CryptConfig>) -> Result<Self, Error> {
- writer.seek(SeekFrom::Start(0))?;
- let head = EncryptedDataBlobHeader {
- head: DataBlobHeader {
- magic: file_formats::ENCRYPTED_BLOB_MAGIC_1_0,
- crc: [0; 4],
- },
- iv: [0u8; 16],
- tag: [0u8; 16],
- };
- unsafe {
- writer.write_le_value(head)?;
- }
-
- let csum_writer = ChecksumWriter::new(writer, None);
- let crypt_writer = CryptWriter::new(csum_writer, config)?;
- Ok(Self {
- state: BlobWriterState::Encrypted { crypt_writer },
- })
- }
-
- pub fn new_encrypted_compressed(
- mut writer: W,
- config: Arc<CryptConfig>,
- ) -> Result<Self, Error> {
- writer.seek(SeekFrom::Start(0))?;
- let head = EncryptedDataBlobHeader {
- head: DataBlobHeader {
- magic: file_formats::ENCR_COMPR_BLOB_MAGIC_1_0,
- crc: [0; 4],
- },
- iv: [0u8; 16],
- tag: [0u8; 16],
- };
- unsafe {
- writer.write_le_value(head)?;
- }
-
- let csum_writer = ChecksumWriter::new(writer, None);
- let crypt_writer = CryptWriter::new(csum_writer, config)?;
- let compr = zstd::stream::write::Encoder::new(crypt_writer, 1)?;
- Ok(Self {
- state: BlobWriterState::EncryptedCompressed { compr },
- })
- }
-
- pub fn finish(self) -> Result<W, Error> {
- match self.state {
- BlobWriterState::Uncompressed { csum_writer } => {
- // write CRC
- let (mut writer, crc, _) = csum_writer.finish()?;
- let head = DataBlobHeader {
- magic: file_formats::UNCOMPRESSED_BLOB_MAGIC_1_0,
- crc: crc.to_le_bytes(),
- };
-
- writer.seek(SeekFrom::Start(0))?;
- unsafe {
- writer.write_le_value(head)?;
- }
-
- Ok(writer)
- }
- BlobWriterState::Compressed { compr } => {
- let csum_writer = compr.finish()?;
- let (mut writer, crc, _) = csum_writer.finish()?;
-
- let head = DataBlobHeader {
- magic: file_formats::COMPRESSED_BLOB_MAGIC_1_0,
- crc: crc.to_le_bytes(),
- };
-
- writer.seek(SeekFrom::Start(0))?;
- unsafe {
- writer.write_le_value(head)?;
- }
-
- Ok(writer)
- }
- BlobWriterState::Encrypted { crypt_writer } => {
- let (csum_writer, iv, tag) = crypt_writer.finish()?;
- let (mut writer, crc, _) = csum_writer.finish()?;
-
- let head = EncryptedDataBlobHeader {
- head: DataBlobHeader {
- magic: file_formats::ENCRYPTED_BLOB_MAGIC_1_0,
- crc: crc.to_le_bytes(),
- },
- iv,
- tag,
- };
- writer.seek(SeekFrom::Start(0))?;
- unsafe {
- writer.write_le_value(head)?;
- }
- Ok(writer)
- }
- BlobWriterState::EncryptedCompressed { compr } => {
- let crypt_writer = compr.finish()?;
- let (csum_writer, iv, tag) = crypt_writer.finish()?;
- let (mut writer, crc, _) = csum_writer.finish()?;
-
- let head = EncryptedDataBlobHeader {
- head: DataBlobHeader {
- magic: file_formats::ENCR_COMPR_BLOB_MAGIC_1_0,
- crc: crc.to_le_bytes(),
- },
- iv,
- tag,
- };
- writer.seek(SeekFrom::Start(0))?;
- unsafe {
- writer.write_le_value(head)?;
- }
- Ok(writer)
- }
- }
- }
-}
-
-impl<W: Write + Seek> Write for DataBlobWriter<'_, W> {
- fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
- match self.state {
- BlobWriterState::Uncompressed {
- ref mut csum_writer,
- } => csum_writer.write(buf),
- BlobWriterState::Compressed { ref mut compr } => compr.write(buf),
- BlobWriterState::Encrypted {
- ref mut crypt_writer,
- } => crypt_writer.write(buf),
- BlobWriterState::EncryptedCompressed { ref mut compr } => compr.write(buf),
- }
- }
-
- fn flush(&mut self) -> Result<(), std::io::Error> {
- match self.state {
- BlobWriterState::Uncompressed {
- ref mut csum_writer,
- } => csum_writer.flush(),
- BlobWriterState::Compressed { ref mut compr } => compr.flush(),
- BlobWriterState::Encrypted {
- ref mut crypt_writer,
- } => crypt_writer.flush(),
- BlobWriterState::EncryptedCompressed { ref mut compr } => compr.flush(),
- }
- }
-}
diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
index 3e4aa34c..202b0955 100644
--- a/pbs-datastore/src/lib.rs
+++ b/pbs-datastore/src/lib.rs
@@ -179,7 +179,6 @@ pub mod crypt_reader;
pub mod crypt_writer;
pub mod data_blob;
pub mod data_blob_reader;
-pub mod data_blob_writer;
pub mod file_formats;
pub mod index;
pub mod manifest;
@@ -201,7 +200,6 @@ pub use crypt_reader::CryptReader;
pub use crypt_writer::CryptWriter;
pub use data_blob::DataBlob;
pub use data_blob_reader::DataBlobReader;
-pub use data_blob_writer::DataBlobWriter;
pub use manifest::BackupManifest;
pub use store_progress::StoreProgress;
diff --git a/tests/blob_writer.rs b/tests/blob_writer.rs
deleted file mode 100644
index 23a3283d..00000000
--- a/tests/blob_writer.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-use std::io::Cursor;
-use std::io::{Read, Seek, SeekFrom, Write};
-use std::sync::Arc;
-
-use anyhow::{bail, Error};
-use lazy_static::lazy_static;
-
-use pbs_datastore::{DataBlob, DataBlobReader, DataBlobWriter};
-use pbs_tools::crypt_config::CryptConfig;
-
-lazy_static! {
- static ref TEST_DATA: Vec<u8> = {
- let mut data = Vec::new();
-
- for i in 0..100_000 {
- data.push((i % 255) as u8);
- }
-
- data
- };
- static ref CRYPT_CONFIG: Arc<CryptConfig> = {
- let key = [1u8; 32];
- Arc::new(CryptConfig::new(key).unwrap())
- };
- static ref TEST_DIGEST_PLAIN: [u8; 32] = [
- 83, 154, 96, 195, 167, 204, 38, 142, 204, 224, 130, 201, 24, 71, 2, 188, 130, 155, 177, 6,
- 162, 100, 61, 238, 38, 219, 63, 240, 191, 132, 87, 238
- ];
- static ref TEST_DIGEST_ENC: [u8; 32] = [
- 50, 162, 191, 93, 255, 132, 9, 14, 127, 23, 92, 39, 246, 102, 245, 204, 130, 104, 4, 106,
- 182, 239, 218, 14, 80, 17, 150, 188, 239, 253, 198, 117
- ];
-}
-
-fn verify_test_blob(mut cursor: Cursor<Vec<u8>>, digest: &[u8; 32]) -> Result<(), Error> {
- // run read tests with different buffer sizes
- for size in [1, 3, 64 * 1024].iter() {
- println!("Starting DataBlobReader test (size = {})", size);
-
- cursor.seek(SeekFrom::Start(0))?;
- let mut reader = DataBlobReader::new(&mut cursor, Some(CRYPT_CONFIG.clone()))?;
- let mut buffer = Vec::<u8>::new();
- // read the whole file
- //reader.read_to_end(&mut buffer)?;
- let mut buf = vec![0u8; *size];
- loop {
- let count = reader.read(&mut buf)?;
- if count == 0 {
- break;
- }
- buffer.extend(&buf[..count]);
- }
-
- reader.finish()?;
- if buffer != *TEST_DATA {
- bail!("blob data is wrong (read buffer size {})", size);
- }
- }
-
- let raw_data = cursor.into_inner();
-
- let blob = DataBlob::load_from_reader(&mut &raw_data[..])?;
-
- let data = blob.decode(Some(&CRYPT_CONFIG), Some(digest))?;
- if data != *TEST_DATA {
- bail!("blob data is wrong (decode)");
- }
- Ok(())
-}
-
-#[test]
-fn test_uncompressed_blob_writer() -> Result<(), Error> {
- let tmp = Cursor::new(Vec::<u8>::new());
- let mut blob_writer = DataBlobWriter::new_uncompressed(tmp)?;
- blob_writer.write_all(&TEST_DATA)?;
-
- verify_test_blob(blob_writer.finish()?, &TEST_DIGEST_PLAIN)
-}
-
-#[test]
-fn test_compressed_blob_writer() -> Result<(), Error> {
- let tmp = Cursor::new(Vec::<u8>::new());
- let mut blob_writer = DataBlobWriter::new_compressed(tmp)?;
- blob_writer.write_all(&TEST_DATA)?;
-
- verify_test_blob(blob_writer.finish()?, &TEST_DIGEST_PLAIN)
-}
-
-#[test]
-fn test_encrypted_blob_writer() -> Result<(), Error> {
- let tmp = Cursor::new(Vec::<u8>::new());
- let mut blob_writer = DataBlobWriter::new_encrypted(tmp, CRYPT_CONFIG.clone())?;
- blob_writer.write_all(&TEST_DATA)?;
-
- verify_test_blob(blob_writer.finish()?, &TEST_DIGEST_ENC)
-}
-
-#[test]
-fn test_encrypted_compressed_blob_writer() -> Result<(), Error> {
- let tmp = Cursor::new(Vec::<u8>::new());
- let mut blob_writer = DataBlobWriter::new_encrypted_compressed(tmp, CRYPT_CONFIG.clone())?;
- blob_writer.write_all(&TEST_DATA)?;
-
- verify_test_blob(blob_writer.finish()?, &TEST_DIGEST_ENC)
-}
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2024-07-23 10:10 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-07-23 10:10 [pbs-devel] [PATCH proxmox-backup 1/2] datastore: data blob: increase compression throughput Dominik Csapak
2024-07-23 10:10 ` [pbs-devel] [PATCH proxmox-backup 2/2] remove data blob writer Dominik Csapak
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox