From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 280401FF140 for ; Fri, 10 Apr 2026 19:03:47 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 7DA9224BC3; Fri, 10 Apr 2026 19:04:31 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v2 23/27] sync: pull: introduce and use decrypt index writer if crypt config Date: Fri, 10 Apr 2026 18:54:50 +0200 Message-ID: <20260410165454.1578501-24-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260410165454.1578501-1-c.ebner@proxmox.com> References: <20260410165454.1578501-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1775840041028 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.068 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: GS5QDFPSK7IX3OJYX5OW6NN7A2RPMJTA X-Message-ID-Hash: GS5QDFPSK7IX3OJYX5OW6NN7A2RPMJTA X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: In order to decrypt and encrypted index file during a pull sync job when a matching decryption key is configured, the index has to be rewritten as the chunks has to be decrypted and the new digests calculated based on the decrypted chunk. The newly written index file need to finally replace the original one, achieved by replacing the original tempfile after pulling the chunks. In order to be able to do so, provide a DecryptedIndexWriter instance to the chunk pulling logic. The DecryptIndexWriter provides variants for fix and dynamic index writers, or none if no rewriting should happen. This remains disarmed for the time being by never passing the crypt config until the logic to decrypt the chunk and re-calculate the digests is in place, done in subsequent code changes. Signed-off-by: Christian Ebner --- src/server/pull.rs | 133 ++++++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 45 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 39f4b2d75..7f5c00ddb 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -21,8 +21,8 @@ use pbs_api_types::{ use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; -use pbs_datastore::dynamic_index::DynamicIndexReader; -use pbs_datastore::fixed_index::FixedIndexReader; +use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter}; +use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter}; use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{BackupManifest, FileInfo}; use pbs_datastore::read_chunk::AsyncReadChunk; @@ -166,6 +166,7 @@ async fn pull_index_chunks( index: I, encountered_chunks: Arc>, backend: &DatastoreBackend, + decrypted_index_writer: DecryptedIndexWriter, ) -> Result { use futures::stream::{self, StreamExt, TryStreamExt}; @@ -201,55 +202,61 @@ async fn pull_index_chunks( let bytes = Arc::new(AtomicUsize::new(0)); let chunk_count = Arc::new(AtomicUsize::new(0)); - stream - .map(|info| { - let target = Arc::clone(&target); - let chunk_reader = chunk_reader.clone(); - let bytes = Arc::clone(&bytes); - let chunk_count = Arc::clone(&chunk_count); - let verify_and_write_channel = verify_and_write_channel.clone(); - let encountered_chunks = Arc::clone(&encountered_chunks); + let stream = stream.map(|info| { + let target = Arc::clone(&target); + let chunk_reader = chunk_reader.clone(); + let bytes = Arc::clone(&bytes); + let chunk_count = Arc::clone(&chunk_count); + let verify_and_write_channel = verify_and_write_channel.clone(); + let encountered_chunks = Arc::clone(&encountered_chunks); - Ok::<_, Error>(async move { - { - // limit guard scope - let mut guard = encountered_chunks.lock().unwrap(); - if let Some(touched) = guard.check_reusable(&info.digest) { - if touched { - return Ok::<_, Error>(()); - } - let chunk_exists = proxmox_async::runtime::block_in_place(|| { - target.cond_touch_chunk(&info.digest, false) - })?; - if chunk_exists { - guard.mark_touched(&info.digest); - //info!("chunk {} exists {}", pos, hex::encode(digest)); - return Ok::<_, Error>(()); - } + Ok::<_, Error>(async move { + { + // limit guard scope + let mut guard = encountered_chunks.lock().unwrap(); + if let Some(touched) = guard.check_reusable(&info.digest) { + if touched { + return Ok::<_, Error>(()); + } + let chunk_exists = proxmox_async::runtime::block_in_place(|| { + target.cond_touch_chunk(&info.digest, false) + })?; + if chunk_exists { + guard.mark_touched(&info.digest); + //info!("chunk {} exists {}", pos, hex::encode(digest)); + return Ok::<_, Error>(()); } - // mark before actually downloading the chunk, so this happens only once - guard.mark_reusable(&info.digest); - guard.mark_touched(&info.digest); } + // mark before actually downloading the chunk, so this happens only once + guard.mark_reusable(&info.digest); + guard.mark_touched(&info.digest); + } - //info!("sync {} chunk {}", pos, hex::encode(digest)); - let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; - let raw_size = chunk.raw_size() as usize; + //info!("sync {} chunk {}", pos, hex::encode(digest)); + let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + let raw_size = chunk.raw_size() as usize; - // decode, verify and write in a separate threads to maximize throughput - proxmox_async::runtime::block_in_place(|| { - verify_and_write_channel.send((chunk, info.digest, info.size())) - })?; + // decode, verify and write in a separate threads to maximize throughput + proxmox_async::runtime::block_in_place(|| { + verify_and_write_channel.send((chunk, info.digest, info.size())) + })?; - bytes.fetch_add(raw_size, Ordering::SeqCst); - chunk_count.fetch_add(1, Ordering::SeqCst); + bytes.fetch_add(raw_size, Ordering::SeqCst); + chunk_count.fetch_add(1, Ordering::SeqCst); - Ok(()) - }) + Ok(()) }) - .try_buffer_unordered(20) - .try_for_each(|_res| futures::future::ok(())) - .await?; + }); + + if let DecryptedIndexWriter::None = decrypted_index_writer { + stream + .try_buffer_unordered(20) + .try_for_each(|_res| futures::future::ok(())) + .await?; + } else { + // must keep chunk order to correctly rewrite index file + stream.try_for_each(|item| item).await?; + } drop(verify_and_write_channel); @@ -330,9 +337,15 @@ async fn pull_single_archive<'a>( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - if reader.skip_chunk_sync(snapshot.datastore().name()) { + if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) { info!("skipping chunk sync for same datastore"); } else { + let new_index_writer = if crypt_config.is_some() { + let writer = DynamicIndexWriter::create(&path)?; + DecryptedIndexWriter::Dynamic(Arc::new(Mutex::new(writer))) + } else { + DecryptedIndexWriter::None + }; let stats = pull_index_chunks( reader .chunk_reader(crypt_config.clone(), archive_info.crypt_mode) @@ -341,8 +354,16 @@ async fn pull_single_archive<'a>( index, encountered_chunks, backend, + new_index_writer.clone(), ) .await?; + if let DecryptedIndexWriter::Dynamic(index) = &new_index_writer { + let csum = index.lock().unwrap().close()?; + + // Overwrite current tmp file so it will be persisted instead + std::fs::rename(&path, &tmp_path)?; + } + sync_stats.add(stats); } } @@ -353,9 +374,16 @@ async fn pull_single_archive<'a>( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - if reader.skip_chunk_sync(snapshot.datastore().name()) { + if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) { info!("skipping chunk sync for same datastore"); } else { + let new_index_writer = if crypt_config.is_some() { + let writer = + FixedIndexWriter::create(&path, Some(size), index.chunk_size as u32)?; + DecryptedIndexWriter::Fixed(Arc::new(Mutex::new(writer))) + } else { + DecryptedIndexWriter::None + }; let stats = pull_index_chunks( reader .chunk_reader(crypt_config.clone(), archive_info.crypt_mode) @@ -364,8 +392,16 @@ async fn pull_single_archive<'a>( index, encountered_chunks, backend, + new_index_writer.clone(), ) .await?; + if let DecryptedIndexWriter::Fixed(index) = &new_index_writer { + let csum = index.lock().unwrap().close()?; + + // Overwrite current tmp file so it will be persisted instead + std::fs::rename(&path, &tmp_path)?; + } + sync_stats.add(stats); } } @@ -1280,3 +1316,10 @@ impl EncounteredChunks { self.chunk_set.clear(); } } + +#[derive(Clone)] +enum DecryptedIndexWriter { + Fixed(Arc>), + Dynamic(Arc>), + None, +} -- 2.47.3