public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v2 23/27] sync: pull: introduce and use decrypt index writer if crypt config
Date: Fri, 10 Apr 2026 18:54:50 +0200	[thread overview]
Message-ID: <20260410165454.1578501-24-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260410165454.1578501-1-c.ebner@proxmox.com>

In order to decrypt and encrypted index file during a pull sync job
when a matching decryption key is configured, the index has to be
rewritten as the chunks has to be decrypted and the new digests
calculated based on the decrypted chunk. The newly written index file
need to finally replace the original one, achieved by replacing the
original tempfile after pulling the chunks.

In order to be able to do so, provide a DecryptedIndexWriter instance
to the chunk pulling logic. The DecryptIndexWriter provides variants
for fix and dynamic index writers, or none if no rewriting should
happen.

This remains disarmed for the time being by never passing the crypt
config until the logic to decrypt the chunk and re-calculate the
digests is in place, done in subsequent code changes.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 133 ++++++++++++++++++++++++++++++---------------
 1 file changed, 88 insertions(+), 45 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index 39f4b2d75..7f5c00ddb 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -21,8 +21,8 @@ use pbs_api_types::{
 use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
-use pbs_datastore::dynamic_index::DynamicIndexReader;
-use pbs_datastore::fixed_index::FixedIndexReader;
+use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
+use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter};
 use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{BackupManifest, FileInfo};
 use pbs_datastore::read_chunk::AsyncReadChunk;
@@ -166,6 +166,7 @@ async fn pull_index_chunks<I: IndexFile>(
     index: I,
     encountered_chunks: Arc<Mutex<EncounteredChunks>>,
     backend: &DatastoreBackend,
+    decrypted_index_writer: DecryptedIndexWriter,
 ) -> Result<SyncStats, Error> {
     use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -201,55 +202,61 @@ async fn pull_index_chunks<I: IndexFile>(
     let bytes = Arc::new(AtomicUsize::new(0));
     let chunk_count = Arc::new(AtomicUsize::new(0));
 
-    stream
-        .map(|info| {
-            let target = Arc::clone(&target);
-            let chunk_reader = chunk_reader.clone();
-            let bytes = Arc::clone(&bytes);
-            let chunk_count = Arc::clone(&chunk_count);
-            let verify_and_write_channel = verify_and_write_channel.clone();
-            let encountered_chunks = Arc::clone(&encountered_chunks);
+    let stream = stream.map(|info| {
+        let target = Arc::clone(&target);
+        let chunk_reader = chunk_reader.clone();
+        let bytes = Arc::clone(&bytes);
+        let chunk_count = Arc::clone(&chunk_count);
+        let verify_and_write_channel = verify_and_write_channel.clone();
+        let encountered_chunks = Arc::clone(&encountered_chunks);
 
-            Ok::<_, Error>(async move {
-                {
-                    // limit guard scope
-                    let mut guard = encountered_chunks.lock().unwrap();
-                    if let Some(touched) = guard.check_reusable(&info.digest) {
-                        if touched {
-                            return Ok::<_, Error>(());
-                        }
-                        let chunk_exists = proxmox_async::runtime::block_in_place(|| {
-                            target.cond_touch_chunk(&info.digest, false)
-                        })?;
-                        if chunk_exists {
-                            guard.mark_touched(&info.digest);
-                            //info!("chunk {} exists {}", pos, hex::encode(digest));
-                            return Ok::<_, Error>(());
-                        }
+        Ok::<_, Error>(async move {
+            {
+                // limit guard scope
+                let mut guard = encountered_chunks.lock().unwrap();
+                if let Some(touched) = guard.check_reusable(&info.digest) {
+                    if touched {
+                        return Ok::<_, Error>(());
+                    }
+                    let chunk_exists = proxmox_async::runtime::block_in_place(|| {
+                        target.cond_touch_chunk(&info.digest, false)
+                    })?;
+                    if chunk_exists {
+                        guard.mark_touched(&info.digest);
+                        //info!("chunk {} exists {}", pos, hex::encode(digest));
+                        return Ok::<_, Error>(());
                     }
-                    // mark before actually downloading the chunk, so this happens only once
-                    guard.mark_reusable(&info.digest);
-                    guard.mark_touched(&info.digest);
                 }
+                // mark before actually downloading the chunk, so this happens only once
+                guard.mark_reusable(&info.digest);
+                guard.mark_touched(&info.digest);
+            }
 
-                //info!("sync {} chunk {}", pos, hex::encode(digest));
-                let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
-                let raw_size = chunk.raw_size() as usize;
+            //info!("sync {} chunk {}", pos, hex::encode(digest));
+            let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+            let raw_size = chunk.raw_size() as usize;
 
-                // decode, verify and write in a separate threads to maximize throughput
-                proxmox_async::runtime::block_in_place(|| {
-                    verify_and_write_channel.send((chunk, info.digest, info.size()))
-                })?;
+            // decode, verify and write in a separate threads to maximize throughput
+            proxmox_async::runtime::block_in_place(|| {
+                verify_and_write_channel.send((chunk, info.digest, info.size()))
+            })?;
 
-                bytes.fetch_add(raw_size, Ordering::SeqCst);
-                chunk_count.fetch_add(1, Ordering::SeqCst);
+            bytes.fetch_add(raw_size, Ordering::SeqCst);
+            chunk_count.fetch_add(1, Ordering::SeqCst);
 
-                Ok(())
-            })
+            Ok(())
         })
-        .try_buffer_unordered(20)
-        .try_for_each(|_res| futures::future::ok(()))
-        .await?;
+    });
+
+    if let DecryptedIndexWriter::None = decrypted_index_writer {
+        stream
+            .try_buffer_unordered(20)
+            .try_for_each(|_res| futures::future::ok(()))
+            .await?;
+    } else {
+        // must keep chunk order to correctly rewrite index file
+        stream.try_for_each(|item| item).await?;
+    }
 
     drop(verify_and_write_channel);
 
@@ -330,9 +337,15 @@ async fn pull_single_archive<'a>(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+            if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) {
                 info!("skipping chunk sync for same datastore");
             } else {
+                let new_index_writer = if crypt_config.is_some() {
+                    let writer = DynamicIndexWriter::create(&path)?;
+                    DecryptedIndexWriter::Dynamic(Arc::new(Mutex::new(writer)))
+                } else {
+                    DecryptedIndexWriter::None
+                };
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(crypt_config.clone(), archive_info.crypt_mode)
@@ -341,8 +354,16 @@ async fn pull_single_archive<'a>(
                     index,
                     encountered_chunks,
                     backend,
+                    new_index_writer.clone(),
                 )
                 .await?;
+                if let DecryptedIndexWriter::Dynamic(index) = &new_index_writer {
+                    let csum = index.lock().unwrap().close()?;
+
+                    // Overwrite current tmp file so it will be persisted instead
+                    std::fs::rename(&path, &tmp_path)?;
+                }
+
                 sync_stats.add(stats);
             }
         }
@@ -353,9 +374,16 @@ async fn pull_single_archive<'a>(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+            if crypt_config.is_none() && reader.skip_chunk_sync(snapshot.datastore().name()) {
                 info!("skipping chunk sync for same datastore");
             } else {
+                let new_index_writer = if crypt_config.is_some() {
+                    let writer =
+                        FixedIndexWriter::create(&path, Some(size), index.chunk_size as u32)?;
+                    DecryptedIndexWriter::Fixed(Arc::new(Mutex::new(writer)))
+                } else {
+                    DecryptedIndexWriter::None
+                };
                 let stats = pull_index_chunks(
                     reader
                         .chunk_reader(crypt_config.clone(), archive_info.crypt_mode)
@@ -364,8 +392,16 @@ async fn pull_single_archive<'a>(
                     index,
                     encountered_chunks,
                     backend,
+                    new_index_writer.clone(),
                 )
                 .await?;
+                if let DecryptedIndexWriter::Fixed(index) = &new_index_writer {
+                    let csum = index.lock().unwrap().close()?;
+
+                    // Overwrite current tmp file so it will be persisted instead
+                    std::fs::rename(&path, &tmp_path)?;
+                }
+
                 sync_stats.add(stats);
             }
         }
@@ -1280,3 +1316,10 @@ impl EncounteredChunks {
         self.chunk_set.clear();
     }
 }
+
+#[derive(Clone)]
+enum DecryptedIndexWriter {
+    Fixed(Arc<Mutex<FixedIndexWriter>>),
+    Dynamic(Arc<Mutex<DynamicIndexWriter>>),
+    None,
+}
-- 
2.47.3





  parent reply	other threads:[~2026-04-10 17:03 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-10 16:54 [PATCH proxmox{,-backup} v2 00/27] fix #7251: implement server side encryption support for push sync jobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox v2 01/27] pbs-api-types: define en-/decryption key type and schema Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox v2 02/27] pbs-api-types: sync job: add optional cryptographic keys to config Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 03/27] datastore: blob: implement async reader for data blobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 04/27] datastore: manifest: add helper for change detection fingerprint Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 05/27] pbs-key-config: introduce store_with() for KeyConfig Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 06/27] pbs-config: implement encryption key config handling Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 07/27] pbs-config: acls: add 'encryption-keys' as valid 'system' subpath Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 08/27] ui: expose 'encryption-keys' as acl subpath for 'system' Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 09/27] sync: add helper to check encryption key acls and load key Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 10/27] api: config: add endpoints for encryption key manipulation Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 11/27] api: config: check sync owner has access to en-/decryption keys Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 12/27] api: config: allow encryption key manipulation for sync job Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 13/27] sync: push: rewrite manifest instead of pushing pre-existing one Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 14/27] api: push sync: expose optional encryption key for push sync Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 15/27] sync: push: optionally encrypt data blob on upload Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 16/27] sync: push: optionally encrypt client log on upload if key is given Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 17/27] sync: push: add helper for loading known chunks from previous snapshot Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 18/27] fix #7251: api: push: encrypt snapshots using configured encryption key Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 19/27] ui: define and expose encryption key management menu item and windows Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 20/27] ui: expose assigning encryption key to sync jobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 21/27] sync: pull: load encryption key if given in job config Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 22/27] sync: expand source chunk reader trait by crypt config Christian Ebner
2026-04-10 16:54 ` Christian Ebner [this message]
2026-04-10 16:54 ` [PATCH proxmox-backup v2 24/27] sync: pull: extend encountered chunk by optional decrypted digest Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 25/27] sync: pull: decrypt blob files on pull if encryption key is configured Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 26/27] sync: pull: decrypt chunks and rewrite index file for matching key Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 27/27] sync: pull: decrypt snapshots with matching encryption key fingerprint Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260410165454.1578501-24-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal