all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup 16/20] sync: pull: introduce and use decrypt index writer if crypt config
Date: Wed,  1 Apr 2026 09:55:17 +0200	[thread overview]
Message-ID: <20260401075521.176354-17-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260401075521.176354-1-c.ebner@proxmox.com>

In order to decrypt and encrypted index file during a pull sync job
when a matching decryption key is configured, the index has to be
rewritten as the chunks has to be decrypted and the new digests
calculated based on the decrypted chunk. The newly written index file
need to finally replace the original one, achieved by replacing the
original tempfile after pulling the chunks.

In order to be able to do so, provide a DecryptedIndexWriter instance
to the chunk pulling logic. The DecryptIndexWriter provides variants
for fix and dynamic index writers, or none if no rewriting should
happen.

This remains disarmed for the time being by never passing the crypt
config until the logic to decrypt the chunk and re-calculate the
digests is in place, done in subsequent code changes.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 135 ++++++++++++++++++++++++++++++---------------
 1 file changed, 89 insertions(+), 46 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index a5d1b3079..8002bbf87 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -21,8 +21,8 @@ use pbs_api_types::{
 use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
-use pbs_datastore::dynamic_index::DynamicIndexReader;
-use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
+use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::read_chunk::AsyncReadChunk;
@@ -155,6 +155,7 @@ async fn pull_index_chunks<I: IndexFile>(
     index: I,
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     backend: &DatastoreBackend,
+    decrypted_index_writer: DecryptedIndexWriter,
 ) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -190,55 +191,61 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = Arc::new(AtomicUsize::new(0));
     let chunk_count = Arc::new(AtomicUsize::new(0));
 
-    stream
-        .map(|info| {
-            let target = Arc::clone(&target);
-            let chunk_reader = chunk_reader.clone();
-            let bytes = Arc::clone(&bytes);
-            let chunk_count = Arc::clone(&chunk_count);
-            let verify_and_write_channel = verify_and_write_channel.clone();
-            let encountered_chunks = Arc::clone(&encountered_chunks);
-
-            Ok::<_, Error>(async move {
-                {
-                    // limit guard scope
-                    let mut guard = encountered_chunks.lock().unwrap();
-                    if let Some(touched) = guard.check_reusable(&info.digest) {
-                        if touched {
-                            return Ok::<_, Error>(());
-                        }
-                        let chunk_exists = proxmox_async::runtime::block_in_place(|| {
-                            target.cond_touch_chunk(&info.digest, false)
-                        })?;
-                        if chunk_exists {
-                            guard.mark_touched(&info.digest);
-                            //info!("chunk {} exists {}", pos, hex::encode(digest));
-                            return Ok::<_, Error>(());
-                        }
+    let stream = stream.map(|info| {
+        let target = Arc::clone(&target);
+        let chunk_reader = chunk_reader.clone();
+        let bytes = Arc::clone(&bytes);
+        let chunk_count = Arc::clone(&chunk_count);
+        let verify_and_write_channel = verify_and_write_channel.clone();
+        let encountered_chunks = Arc::clone(&encountered_chunks);
+
+        Ok::<_, Error>(async move {
+            {
+                // limit guard scope
+                let mut guard = encountered_chunks.lock().unwrap();
+                if let Some(touched) = guard.check_reusable(&info.digest) {
+                    if touched {
+                        return Ok::<_, Error>(());
+                    }
+                    let chunk_exists = proxmox_async::runtime::block_in_place(|| {
+                        target.cond_touch_chunk(&info.digest, false)
+                    })?;
+                    if chunk_exists {
+                        guard.mark_touched(&info.digest);
+                        //info!("chunk {} exists {}", pos, hex::encode(digest));
+                        return Ok::<_, Error>(());
                     }
-                    // mark before actually downloading the chunk, so this happens only once
-                    guard.mark_reusable(&info.digest);
-                    guard.mark_touched(&info.digest);
                 }
+                // mark before actually downloading the chunk, so this happens only once
+                guard.mark_reusable(&info.digest);
+                guard.mark_touched(&info.digest);
+            }
 
-                //info!("sync {} chunk {}", pos, hex::encode(digest));
-                let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
-                let raw_size = chunk.raw_size() as usize;
+            //info!("sync {} chunk {}", pos, hex::encode(digest));
+            let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+            let raw_size = chunk.raw_size() as usize;
 
-                // decode, verify and write in a separate threads to maximize throughput
-                proxmox_async::runtime::block_in_place(|| {
-                    verify_and_write_channel.send((chunk, info.digest, info.size()))
-                })?;
+            // decode, verify and write in a separate threads to maximize throughput
+            proxmox_async::runtime::block_in_place(|| {
+                verify_and_write_channel.send((chunk, info.digest, info.size()))
+            })?;
 
-                bytes.fetch_add(raw_size, Ordering::SeqCst);
-                chunk_count.fetch_add(1, Ordering::SeqCst);
+            bytes.fetch_add(raw_size, Ordering::SeqCst);
+            chunk_count.fetch_add(1, Ordering::SeqCst);
 
-                Ok(())
-            })
+            Ok(())
         })
-        .try_buffer_unordered(20)
-        .try_for_each(|_res| futures::future::ok(()))
-        .await?;
+    });
+
+    if let DecryptedIndexWriter::None = decrypted_index_writer {
+        stream
+            .try_buffer_unordered(20)
+            .try_for_each(|_res| futures::future::ok(()))
+            .await?;
+    } else {
+        // must keep chunk order to correctly rewrite index file
+        stream.try_for_each(|item| item).await?;
+    }
 
     drop(verify_and_write_channel);
 
@@ -319,9 +326,15 @@ async fn pull_single_archive<'a>(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+            if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) {
                 info!("skipping chunk sync for same datastore");
             } else {
+                let new_index_writer = if crypt_config.is_some() {
+                    let writer = DynamicIndexWriter::create(&path)?;
+                    DecryptedIndexWriter::Dynamic(Arc::new(Mutex::new(writer)))
+                } else {
+                    DecryptedIndexWriter::None
+                };
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(crypt_config.clone(), archive_info.crypt_mode)
@@ -330,8 +343,16 @@ async fn pull_single_archive<'a>(
                     index,
                     encountered_chunks,
                     backend,
+                    new_index_writer.clone(),
                 )
                 .await?;
+                if let DecryptedIndexWriter::Dynamic(index) = &new_index_writer {
+                    let csum = index.lock().unwrap().close()?;
+
+                    // Overwrite current tmp file so it will be persisted instead
+                    std::fs::rename(&path, &tmp_path)?;
+                }
+
                 sync_stats.add(stats);
             }
         }
@@ -342,9 +363,16 @@ async fn pull_single_archive<'a>(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+            if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) {
                 info!("skipping chunk sync for same datastore");
             } else {
+                let new_index_writer = if crypt_config.is_some() {
+                    let writer =
+                        FixedIndexWriter::create(&path, Some(size), index.chunk_size as u32)?;
+                    DecryptedIndexWriter::Fixed(Arc::new(Mutex::new(writer)))
+                } else {
+                    DecryptedIndexWriter::None
+                };
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(crypt_config.clone(), archive_info.crypt_mode)
@@ -353,8 +381,16 @@ async fn pull_single_archive<'a>(
                     index,
                     encountered_chunks,
                     backend,
+                    new_index_writer.clone(),
                 )
                 .await?;
+                if let DecryptedIndexWriter::Fixed(index) = &new_index_writer {
+                    let csum = index.lock().unwrap().close()?;
+
+                    // Overwrite current tmp file so it will be persisted instead
+                    std::fs::rename(&path, &tmp_path)?;
+                }
+
                 sync_stats.add(stats);
             }
         }
@@ -1269,3 +1305,10 @@ impl EncounteredChunks {
         self.chunk_set.clear();
     }
 }
+
+#[derive(Clone)]
+enum DecryptedIndexWriter {
+    Fixed(Arc<Mutex<FixedIndexWriter>>),
+    Dynamic(Arc<Mutex<DynamicIndexWriter>>),
+    None,
+}
-- 
2.47.3





  parent reply	other threads:[~2026-04-01  7:56 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-01  7:55 [PATCH proxmox{,-backup} 00/20] fix #7251: implement server side encryption support for push sync jobs Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox 01/20] pbs-api-types: define encryption key type and schema Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox 02/20] pbs-api-types: sync job: add optional encryption key to config Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 03/20] pbs-key-config: introduce store_with() for KeyConfig Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 04/20] pbs-config: implement encryption key config handling Christian Ebner
2026-04-01 23:27   ` Thomas Lamprecht
2026-04-02  7:09     ` Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 05/20] pbs-config: acls: add 'encryption-keys' as valid 'system' subpath Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 06/20] ui: expose 'encryption-keys' as acl subpath for 'system' Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 07/20] api: config: add endpoints for encryption key manipulation Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 08/20] api: config: allow encryption key manipulation for sync job Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 09/20] sync: push: rewrite manifest instead of pushing pre-existing one Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 10/20] sync: add helper to check encryption key acls and load key Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 11/20] fix #7251: api: push: encrypt snapshots using configured encryption key Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 12/20] ui: define and expose encryption key management menu item and windows Christian Ebner
2026-04-01 23:09   ` Thomas Lamprecht
2026-04-03  8:35     ` Dominik Csapak
2026-04-01 23:10   ` Thomas Lamprecht
2026-04-03 12:16   ` Dominik Csapak
2026-04-01  7:55 ` [PATCH proxmox-backup 13/20] ui: expose assigning encryption key to sync jobs Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 14/20] sync: pull: load encryption key if given in job config Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 15/20] sync: expand source chunk reader trait by crypt config Christian Ebner
2026-04-01  7:55 ` Christian Ebner [this message]
2026-04-01  7:55 ` [PATCH proxmox-backup 17/20] sync: pull: extend encountered chunk by optional decrypted digest Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 18/20] sync: pull: decrypt blob files on pull if encryption key is configured Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 19/20] sync: pull: decrypt chunks and rewrite index file for matching key Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 20/20] sync: pull: decrypt snapshots with matching encryption key fingerprint Christian Ebner
2026-04-02  0:25 ` [PATCH proxmox{,-backup} 00/20] fix #7251: implement server side encryption support for push sync jobs Thomas Lamprecht
2026-04-02  7:37   ` Christian Ebner
2026-04-03  8:39 ` Dominik Csapak
2026-04-03  8:50   ` Christian Ebner
2026-04-03  9:00     ` Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260401075521.176354-17-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal