From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8E1E990C9A for ; Thu, 25 Jan 2024 14:27:01 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 59C7D19B26 for ; Thu, 25 Jan 2024 14:26:31 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 25 Jan 2024 14:26:29 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 3C4FF492A6 for ; Thu, 25 Jan 2024 14:26:29 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 25 Jan 2024 14:25:58 +0100 Message-Id: <20240125132608.1172472-20-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240125132608.1172472-1-c.ebner@proxmox.com> References: <20240125132608.1172472-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.052 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record T_SCC_BODY_TEXT_LINE -0.01 - Subject: [pbs-devel] [PATCH v6 proxmox-backup 19/29] fix #3174: backup writer: inject queued chunk in upload steam X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 25 Jan 2024 13:27:01 -0000 Inject the chunk in the backup writers upload stream, including them thereby in the index file. Signed-off-by: Christian Ebner --- Changes since v5: - no changes pbs-client/src/backup_writer.rs | 85 +++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index cc6dd49a..e8c2ec11 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -23,7 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; -use super::inject_reused_chunks::InjectChunks; +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -667,48 +667,63 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); stream - .and_then(move |data| { - let chunk_len = data.len(); + .inject_reused_chunks( + injection_queue, + stream_len, + reused_len.clone(), + index_csum.clone(), + ) + .and_then(move |chunk_info| match chunk_info { + InjectedChunksInfo::Known(chunks) => { + total_chunks.fetch_add(chunks.len(), Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(chunks)) + } + InjectedChunksInfo::Raw((offset, data)) => { + let chunk_len = data.len(); - total_chunks.fetch_add(1, Ordering::SeqCst); - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; + total_chunks.fetch_add(1, Ordering::SeqCst); - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); + let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); - if let Some(ref crypt_config) = crypt_config { - chunk_builder = chunk_builder.crypt_config(crypt_config); - } + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } - let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + let mut known_chunks = known_chunks.lock().unwrap(); - let mut guard = index_csum.lock().unwrap(); - let csum = guard.as_mut().unwrap(); + let digest = chunk_builder.digest(); - let chunk_end = offset + chunk_len as u64; + let mut guard = index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); - if !is_fixed_chunk_size { - csum.update(&chunk_end.to_le_bytes()); - } - csum.update(digest); + let chunk_end = offset + chunk_len as u64; - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - known_chunk_count.fetch_add(1, Ordering::SeqCst); - reused_len.fetch_add(chunk_len, Ordering::SeqCst); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let compressed_stream_len2 = compressed_stream_len.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) + if !is_fixed_chunk_size { + csum.update(&chunk_end.to_le_bytes()); + } + csum.update(digest); + + let chunk_is_known = known_chunks.contains(digest); + if chunk_is_known { + known_chunk_count.fetch_add(1, Ordering::SeqCst); + reused_len.fetch_add(chunk_len, Ordering::SeqCst); + + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + } else { + let compressed_stream_len2 = compressed_stream_len.clone(); + known_chunks.insert(*digest); + + future::ready(chunk_builder.build().map(move |(chunk, digest)| { + compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); + + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len: chunk_len as u64, + offset, + }) + })) + } } }) .merge_known_chunks() -- 2.39.2