From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 4A4881FF13A for ; Wed, 01 Apr 2026 09:56:15 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 51945115A8; Wed, 1 Apr 2026 09:56:43 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup 16/20] sync: pull: introduce and use decrypt index writer if crypt config Date: Wed, 1 Apr 2026 09:55:17 +0200 Message-ID: <20260401075521.176354-17-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260401075521.176354-1-c.ebner@proxmox.com> References: <20260401075521.176354-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1775030090743 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.064 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: RDQXK3M4YVKY5RDIE3CUYL2PF56SWZNC X-Message-ID-Hash: RDQXK3M4YVKY5RDIE3CUYL2PF56SWZNC X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: In order to decrypt and encrypted index file during a pull sync job when a matching decryption key is configured, the index has to be rewritten as the chunks has to be decrypted and the new digests calculated based on the decrypted chunk. The newly written index file need to finally replace the original one, achieved by replacing the original tempfile after pulling the chunks. In order to be able to do so, provide a DecryptedIndexWriter instance to the chunk pulling logic. The DecryptIndexWriter provides variants for fix and dynamic index writers, or none if no rewriting should happen. This remains disarmed for the time being by never passing the crypt config until the logic to decrypt the chunk and re-calculate the digests is in place, done in subsequent code changes. Signed-off-by: Christian Ebner --- src/server/pull.rs | 135 ++++++++++++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 46 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index a5d1b3079..8002bbf87 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -21,8 +21,8 @@ use pbs_api_types::{ use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; -use pbs_datastore::dynamic_index::DynamicIndexReader; -use pbs_datastore::fixed_index::FixedIndexReader; +use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; +use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter}; use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::read_chunk::AsyncReadChunk; @@ -155,6 +155,7 @@ async fn pull_index_chunks( index: I, encountered_chunks: Arc>, backend: &DatastoreBackend, + decrypted_index_writer: DecryptedIndexWriter, ) -> Result { use futures::stream::{self, StreamExt, TryStreamExt}; @@ -190,55 +191,61 @@ async fn pull_index_chunks( let bytes = Arc::new(AtomicUsize::new(0)); let chunk_count = Arc::new(AtomicUsize::new(0)); - stream - .map(|info| { - let target = Arc::clone(&target); - let chunk_reader = chunk_reader.clone(); - let bytes = Arc::clone(&bytes); - let chunk_count = Arc::clone(&chunk_count); - let verify_and_write_channel = verify_and_write_channel.clone(); - let encountered_chunks = Arc::clone(&encountered_chunks); - - Ok::<_, Error>(async move { - { - // limit guard scope - let mut guard = encountered_chunks.lock().unwrap(); - if let Some(touched) = guard.check_reusable(&info.digest) { - if touched { - return Ok::<_, Error>(()); - } - let chunk_exists = proxmox_async::runtime::block_in_place(|| { - target.cond_touch_chunk(&info.digest, false) - })?; - if chunk_exists { - guard.mark_touched(&info.digest); - //info!("chunk {} exists {}", pos, hex::encode(digest)); - return Ok::<_, Error>(()); - } + let stream = stream.map(|info| { + let target = Arc::clone(&target); + let chunk_reader = chunk_reader.clone(); + let bytes = Arc::clone(&bytes); + let chunk_count = Arc::clone(&chunk_count); + let verify_and_write_channel = verify_and_write_channel.clone(); + let encountered_chunks = Arc::clone(&encountered_chunks); + + Ok::<_, Error>(async move { + { + // limit guard scope + let mut guard = encountered_chunks.lock().unwrap(); + if let Some(touched) = guard.check_reusable(&info.digest) { + if touched { + return Ok::<_, Error>(()); + } + let chunk_exists = proxmox_async::runtime::block_in_place(|| { + target.cond_touch_chunk(&info.digest, false) + })?; + if chunk_exists { + guard.mark_touched(&info.digest); + //info!("chunk {} exists {}", pos, hex::encode(digest)); + return Ok::<_, Error>(()); } - // mark before actually downloading the chunk, so this happens only once - guard.mark_reusable(&info.digest); - guard.mark_touched(&info.digest); } + // mark before actually downloading the chunk, so this happens only once + guard.mark_reusable(&info.digest); + guard.mark_touched(&info.digest); + } - //info!("sync {} chunk {}", pos, hex::encode(digest)); - let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; - let raw_size = chunk.raw_size() as usize; + //info!("sync {} chunk {}", pos, hex::encode(digest)); + let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + let raw_size = chunk.raw_size() as usize; - // decode, verify and write in a separate threads to maximize throughput - proxmox_async::runtime::block_in_place(|| { - verify_and_write_channel.send((chunk, info.digest, info.size())) - })?; + // decode, verify and write in a separate threads to maximize throughput + proxmox_async::runtime::block_in_place(|| { + verify_and_write_channel.send((chunk, info.digest, info.size())) + })?; - bytes.fetch_add(raw_size, Ordering::SeqCst); - chunk_count.fetch_add(1, Ordering::SeqCst); + bytes.fetch_add(raw_size, Ordering::SeqCst); + chunk_count.fetch_add(1, Ordering::SeqCst); - Ok(()) - }) + Ok(()) }) - .try_buffer_unordered(20) - .try_for_each(|_res| futures::future::ok(())) - .await?; + }); + + if let DecryptedIndexWriter::None = decrypted_index_writer { + stream + .try_buffer_unordered(20) + .try_for_each(|_res| futures::future::ok(())) + .await?; + } else { + // must keep chunk order to correctly rewrite index file + stream.try_for_each(|item| item).await?; + } drop(verify_and_write_channel); @@ -319,9 +326,15 @@ async fn pull_single_archive<'a>( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - if reader.skip_chunk_sync(snapshot.datastore().name()) { + if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) { info!("skipping chunk sync for same datastore"); } else { + let new_index_writer = if crypt_config.is_some() { + let writer = DynamicIndexWriter::create(&path)?; + DecryptedIndexWriter::Dynamic(Arc::new(Mutex::new(writer))) + } else { + DecryptedIndexWriter::None + }; let stats = pull_index_chunks( reader .chunk_reader(crypt_config.clone(), archive_info.crypt_mode) @@ -330,8 +343,16 @@ async fn pull_single_archive<'a>( index, encountered_chunks, backend, + new_index_writer.clone(), ) .await?; + if let DecryptedIndexWriter::Dynamic(index) = &new_index_writer { + let csum = index.lock().unwrap().close()?; + + // Overwrite current tmp file so it will be persisted instead + std::fs::rename(&path, &tmp_path)?; + } + sync_stats.add(stats); } } @@ -342,9 +363,16 @@ async fn pull_single_archive<'a>( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - if reader.skip_chunk_sync(snapshot.datastore().name()) { + if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) { info!("skipping chunk sync for same datastore"); } else { + let new_index_writer = if crypt_config.is_some() { + let writer = + FixedIndexWriter::create(&path, Some(size), index.chunk_size as u32)?; + DecryptedIndexWriter::Fixed(Arc::new(Mutex::new(writer))) + } else { + DecryptedIndexWriter::None + }; let stats = pull_index_chunks( reader .chunk_reader(crypt_config.clone(), archive_info.crypt_mode) @@ -353,8 +381,16 @@ async fn pull_single_archive<'a>( index, encountered_chunks, backend, + new_index_writer.clone(), ) .await?; + if let DecryptedIndexWriter::Fixed(index) = &new_index_writer { + let csum = index.lock().unwrap().close()?; + + // Overwrite current tmp file so it will be persisted instead + std::fs::rename(&path, &tmp_path)?; + } + sync_stats.add(stats); } } @@ -1269,3 +1305,10 @@ impl EncounteredChunks { self.chunk_set.clear(); } } + +#[derive(Clone)] +enum DecryptedIndexWriter { + Fixed(Arc>), + Dynamic(Arc>), + None, +} -- 2.47.3