From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 649C81FF13A for ; Wed, 01 Apr 2026 09:55:48 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B0A7910E83; Wed, 1 Apr 2026 09:56:12 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup 19/20] sync: pull: decrypt chunks and rewrite index file for matching key Date: Wed, 1 Apr 2026 09:55:20 +0200 Message-ID: <20260401075521.176354-20-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260401075521.176354-1-c.ebner@proxmox.com> References: <20260401075521.176354-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1775030091459 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.064 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ABQS72K4DTANKMICP3BVEGD444HGWUYV X-Message-ID-Hash: ABQS72K4DTANKMICP3BVEGD444HGWUYV X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Once the matching decryptioin key will be provided, use it to decrypt the chunks on pull and rewrite the index file based on the decrypted chunk digests and offsets. Signed-off-by: Christian Ebner --- src/server/pull.rs | 135 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 114 insertions(+), 21 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index ccf349c92..05152d0dd 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -3,7 +3,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::io::{BufReader, Read, Seek, Write}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -20,7 +20,7 @@ use pbs_api_types::{ }; use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; -use pbs_datastore::data_blob::DataBlob; +use pbs_datastore::data_blob::{DataBlob, DataChunkBuilder}; use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter}; use pbs_datastore::index::IndexFile; @@ -169,7 +169,16 @@ async fn pull_index_chunks( .filter(|info| { let guard = encountered_chunks.lock().unwrap(); match guard.check_reusable(&info.digest) { - Some((touched, _decrypted_chunk)) => !touched, // reusable and already touched, can always skip + Some((touched, mapped_digest)) => { + if mapped_digest.is_some() { + // if there is a mapping, then the chunk digest must be rewritten to + // the index, cannot skip here but optimized when processing the stream + true + } else { + // reusable and already touched, can always skip + !touched + } + } None => true, } }), @@ -191,6 +200,7 @@ async fn pull_index_chunks( let verify_and_write_channel = verify_pool.channel(); let bytes = Arc::new(AtomicUsize::new(0)); + let offset = Arc::new(AtomicU64::new(0)); let chunk_count = Arc::new(AtomicUsize::new(0)); let stream = stream.map(|info| { @@ -200,36 +210,119 @@ async fn pull_index_chunks( let chunk_count = Arc::clone(&chunk_count); let verify_and_write_channel = verify_and_write_channel.clone(); let encountered_chunks = Arc::clone(&encountered_chunks); + let offset = Arc::clone(&offset); + let decrypted_index_writer = decrypted_index_writer.clone(); Ok::<_, Error>(async move { - { - // limit guard scope - let mut guard = encountered_chunks.lock().unwrap(); - if let Some((touched, _decrypted_digest)) = guard.check_reusable(&info.digest) { - if touched { + //info!("sync {} chunk {}", pos, hex::encode(digest)); + let (chunk, digest, size) = match decrypted_index_writer { + DecryptedIndexWriter::Fixed(index) => { + if let Some((_touched, Some(decrypted_digest))) = encountered_chunks + .lock() + .unwrap() + .check_reusable(&info.digest) + { + // already got the decrypted digest and chunk has been written, + // no need to process again + let size = info.size(); + let start_offset = offset.fetch_add(size, Ordering::SeqCst); + + index.lock().unwrap().add_chunk( + start_offset, + size as u32, + decrypted_digest, + )?; + return Ok::<_, Error>(()); } - let chunk_exists = proxmox_async::runtime::block_in_place(|| { - target.cond_touch_chunk(&info.digest, false) - })?; - if chunk_exists { - guard.mark_touched(&info.digest, None); - //info!("chunk {} exists {}", pos, hex::encode(digest)); + + let chunk_data = chunk_reader.read_chunk(&info.digest).await?; + let (chunk, digest) = + DataChunkBuilder::new(&chunk_data).compress(true).build()?; + + let size = chunk_data.len() as u64; + let start_offset = offset.fetch_add(size, Ordering::SeqCst); + + index + .lock() + .unwrap() + .add_chunk(start_offset, size as u32, &digest)?; + + encountered_chunks + .lock() + .unwrap() + .mark_reusable(&info.digest, Some(digest)); + + (chunk, digest, size) + } + DecryptedIndexWriter::Dynamic(index) => { + if let Some((_touched, Some(decrypted_digest))) = encountered_chunks + .lock() + .unwrap() + .check_reusable(&info.digest) + { + // already got the decrypted digest and chunk has been written, + // no need to process again + let size = info.size(); + let start_offset = offset.fetch_add(size, Ordering::SeqCst); + let end_offset = start_offset + size; + + index + .lock() + .unwrap() + .add_chunk(end_offset, decrypted_digest)?; + return Ok::<_, Error>(()); } + + let chunk_data = chunk_reader.read_chunk(&info.digest).await?; + let (chunk, digest) = + DataChunkBuilder::new(&chunk_data).compress(true).build()?; + + let size = chunk_data.len() as u64; + let start_offset = offset.fetch_add(size, Ordering::SeqCst); + let end_offset = start_offset + size; + + index.lock().unwrap().add_chunk(end_offset, &digest)?; + + encountered_chunks + .lock() + .unwrap() + .mark_reusable(&info.digest, Some(digest)); + + (chunk, digest, size) } - // mark before actually downloading the chunk, so this happens only once - guard.mark_reusable(&info.digest, None); - guard.mark_touched(&info.digest, None); - } + DecryptedIndexWriter::None => { + { + // limit guard scope + let mut guard = encountered_chunks.lock().unwrap(); + if let Some((touched, _mapped)) = guard.check_reusable(&info.digest) { + if touched { + return Ok::<_, Error>(()); + } + let chunk_exists = proxmox_async::runtime::block_in_place(|| { + target.cond_touch_chunk(&info.digest, false) + })?; + if chunk_exists { + guard.mark_touched(&info.digest, None); + //info!("chunk {} exists {}", pos, hex::encode(digest)); + return Ok::<_, Error>(()); + } + } + // mark before actually downloading the chunk, so this happens only once + guard.mark_reusable(&info.digest, None); + guard.mark_touched(&info.digest, None); + } - //info!("sync {} chunk {}", pos, hex::encode(digest)); - let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + (chunk, info.digest, info.size()) + } + }; let raw_size = chunk.raw_size() as usize; // decode, verify and write in a separate threads to maximize throughput proxmox_async::runtime::block_in_place(|| { - verify_and_write_channel.send((chunk, info.digest, info.size())) + verify_and_write_channel.send((chunk, digest, size)) })?; bytes.fetch_add(raw_size, Ordering::SeqCst); -- 2.47.3