From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1B76DDB18 for ; Fri, 22 Sep 2023 09:16:57 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 0F2756C83 for ; Fri, 22 Sep 2023 09:16:55 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Fri, 22 Sep 2023 09:16:53 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id C8A294877C for ; Fri, 22 Sep 2023 09:16:52 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Fri, 22 Sep 2023 09:16:19 +0200 Message-Id: <20230922071621.12670-19-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20230922071621.12670-1-c.ebner@proxmox.com> References: <20230922071621.12670-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.111 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 18/20] fix #3174: backup writer: inject queued chunk in upload steam X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Fri, 22 Sep 2023 07:16:57 -0000 Inject the chunk in the backup writers upload stream, including them thereby in the index file. Signed-off-by: Christian Ebner --- pbs-client/src/backup_writer.rs | 84 +++++++++++++++++++-------------- 1 file changed, 49 insertions(+), 35 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index cc6dd49a..0f18b1ff 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -23,7 +23,7 @@ use pbs_tools::crypt_config::CryptConfig; use proxmox_human_byte::HumanByte; -use super::inject_reused_chunks::InjectChunks; +use super::inject_reused_chunks::{InjectChunks, InjectReusedChunks, InjectedChunksInfo}; use super::merge_known_chunks::{MergeKnownChunks, MergedChunkInfo}; use super::{H2Client, HttpClient}; @@ -667,48 +667,62 @@ impl BackupWriter { let index_csum_2 = index_csum.clone(); stream - .and_then(move |data| { - let chunk_len = data.len(); + .inject_reused_chunks(injection_queue, stream_len, index_csum.clone()) + .and_then(move |chunk_info| { + match chunk_info { + InjectedChunksInfo::Known(chunks) => { + total_chunks.fetch_add(chunks.len(), Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(chunks)) + } + InjectedChunksInfo::Raw((offset, data)) => { + let chunk_len = data.len(); - total_chunks.fetch_add(1, Ordering::SeqCst); - let offset = stream_len.fetch_add(chunk_len, Ordering::SeqCst) as u64; + total_chunks.fetch_add(1, Ordering::SeqCst); - let mut chunk_builder = DataChunkBuilder::new(data.as_ref()).compress(compress); + let mut chunk_builder = + DataChunkBuilder::new(data.as_ref()).compress(compress); - if let Some(ref crypt_config) = crypt_config { - chunk_builder = chunk_builder.crypt_config(crypt_config); - } + if let Some(ref crypt_config) = crypt_config { + chunk_builder = chunk_builder.crypt_config(crypt_config); + } - let mut known_chunks = known_chunks.lock().unwrap(); - let digest = chunk_builder.digest(); + let mut known_chunks = known_chunks.lock().unwrap(); - let mut guard = index_csum.lock().unwrap(); - let csum = guard.as_mut().unwrap(); + let digest = chunk_builder.digest(); - let chunk_end = offset + chunk_len as u64; + let mut guard = index_csum.lock().unwrap(); + let csum = guard.as_mut().unwrap(); - if !is_fixed_chunk_size { - csum.update(&chunk_end.to_le_bytes()); - } - csum.update(digest); + let chunk_end = offset + chunk_len as u64; - let chunk_is_known = known_chunks.contains(digest); - if chunk_is_known { - known_chunk_count.fetch_add(1, Ordering::SeqCst); - reused_len.fetch_add(chunk_len, Ordering::SeqCst); - future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) - } else { - let compressed_stream_len2 = compressed_stream_len.clone(); - known_chunks.insert(*digest); - future::ready(chunk_builder.build().map(move |(chunk, digest)| { - compressed_stream_len2.fetch_add(chunk.raw_size(), Ordering::SeqCst); - MergedChunkInfo::New(ChunkInfo { - chunk, - digest, - chunk_len: chunk_len as u64, - offset, - }) - })) + if !is_fixed_chunk_size { + csum.update(&chunk_end.to_le_bytes()); + } + csum.update(digest); + + let chunk_is_known = known_chunks.contains(digest); + if chunk_is_known { + known_chunk_count.fetch_add(1, Ordering::SeqCst); + reused_len.fetch_add(chunk_len, Ordering::SeqCst); + + future::ok(MergedChunkInfo::Known(vec![(offset, *digest)])) + } else { + let compressed_stream_len2 = compressed_stream_len.clone(); + known_chunks.insert(*digest); + + future::ready(chunk_builder.build().map(move |(chunk, digest)| { + compressed_stream_len2 + .fetch_add(chunk.raw_size(), Ordering::SeqCst); + + MergedChunkInfo::New(ChunkInfo { + chunk, + digest, + chunk_len: chunk_len as u64, + offset, + }) + })) + } + } } }) .merge_known_chunks() -- 2.39.2