public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup v2 26/27] sync: pull: decrypt chunks and rewrite index file for matching key
Date: Fri, 10 Apr 2026 18:54:53 +0200	[thread overview]
Message-ID: <20260410165454.1578501-27-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260410165454.1578501-1-c.ebner@proxmox.com>

Once the matching decryptioin key will be provided, use it to decrypt
the chunks on pull and rewrite the index file based on the decrypted
chunk digests and offsets.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 135 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 114 insertions(+), 21 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index ce32afcd7..40e5353dd 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -3,7 +3,7 @@
 use std::collections::hash_map::Entry;
 use std::collections::{HashMap, HashSet};
 use std::io::{BufReader, Read, Seek, Write};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
@@ -20,7 +20,7 @@ use pbs_api_types::{
 };
 use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
-use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::data_blob::{DataBlob, DataChunkBuilder};
 use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
 use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter};
 use pbs_datastore::index::IndexFile;
@@ -180,7 +180,16 @@ async fn pull_index_chunks<I: IndexFile>(
             .filter(|info| {
                 let guard = encountered_chunks.lock().unwrap();
                 match guard.check_reusable(&info.digest) {
-                    Some((touched, _decrypted_chunk)) => !touched, // reusable and already touched, can always skip
+                    Some((touched, mapped_digest)) => {
+                        if mapped_digest.is_some() {
+                            // if there is a mapping, then the chunk digest must be rewritten to
+                            // the index, cannot skip here but optimized when processing the stream
+                            true
+                        } else {
+                            // reusable and already touched, can always skip
+                            !touched
+                        }
+                    }
                     None => true,
                 }
             }),
@@ -202,6 +211,7 @@ async fn pull_index_chunks<I: IndexFile>(
     let verify_and_write_channel = verify_pool.channel();
 
     let bytes = Arc::new(AtomicUsize::new(0));
+    let offset = Arc::new(AtomicU64::new(0));
     let chunk_count = Arc::new(AtomicUsize::new(0));
 
     let stream = stream.map(|info| {
@@ -211,36 +221,119 @@ async fn pull_index_chunks<I: IndexFile>(
         let chunk_count = Arc::clone(&chunk_count);
         let verify_and_write_channel = verify_and_write_channel.clone();
         let encountered_chunks = Arc::clone(&encountered_chunks);
+        let offset = Arc::clone(&offset);
+        let decrypted_index_writer = decrypted_index_writer.clone();
 
         Ok::<_, Error>(async move {
-            {
-                // limit guard scope
-                let mut guard = encountered_chunks.lock().unwrap();
-                if let Some((touched, _decrypted_digest)) = guard.check_reusable(&info.digest) {
-                    if touched {
+            //info!("sync {} chunk {}", pos, hex::encode(digest));
+            let (chunk, digest, size) = match decrypted_index_writer {
+                DecryptedIndexWriter::Fixed(index) => {
+                    if let Some((_touched, Some(decrypted_digest))) = encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .check_reusable(&info.digest)
+                    {
+                        // already got the decrypted digest and chunk has been written,
+                        // no need to process again
+                        let size = info.size();
+                        let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+
+                        index.lock().unwrap().add_chunk(
+                            start_offset,
+                            size as u32,
+                            decrypted_digest,
+                        )?;
+
                         return Ok::<_, Error>(());
                     }
-                    let chunk_exists = proxmox_async::runtime::block_in_place(|| {
-                        target.cond_touch_chunk(&info.digest, false)
-                    })?;
-                    if chunk_exists {
-                        guard.mark_touched(&info.digest, None);
-                        //info!("chunk {} exists {}", pos, hex::encode(digest));
+
+                    let chunk_data = chunk_reader.read_chunk(&info.digest).await?;
+                    let (chunk, digest) =
+                        DataChunkBuilder::new(&chunk_data).compress(true).build()?;
+
+                    let size = chunk_data.len() as u64;
+                    let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+
+                    index
+                        .lock()
+                        .unwrap()
+                        .add_chunk(start_offset, size as u32, &digest)?;
+
+                    encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .mark_reusable(&info.digest, Some(digest));
+
+                    (chunk, digest, size)
+                }
+                DecryptedIndexWriter::Dynamic(index) => {
+                    if let Some((_touched, Some(decrypted_digest))) = encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .check_reusable(&info.digest)
+                    {
+                        // already got the decrypted digest and chunk has been written,
+                        // no need to process again
+                        let size = info.size();
+                        let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+                        let end_offset = start_offset + size;
+
+                        index
+                            .lock()
+                            .unwrap()
+                            .add_chunk(end_offset, decrypted_digest)?;
+
                         return Ok::<_, Error>(());
                     }
+
+                    let chunk_data = chunk_reader.read_chunk(&info.digest).await?;
+                    let (chunk, digest) =
+                        DataChunkBuilder::new(&chunk_data).compress(true).build()?;
+
+                    let size = chunk_data.len() as u64;
+                    let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+                    let end_offset = start_offset + size;
+
+                    index.lock().unwrap().add_chunk(end_offset, &digest)?;
+
+                    encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .mark_reusable(&info.digest, Some(digest));
+
+                    (chunk, digest, size)
                 }
-                // mark before actually downloading the chunk, so this happens only once
-                guard.mark_reusable(&info.digest, None);
-                guard.mark_touched(&info.digest, None);
-            }
+                DecryptedIndexWriter::None => {
+                    {
+                        // limit guard scope
+                        let mut guard = encountered_chunks.lock().unwrap();
+                        if let Some((touched, _mapped)) = guard.check_reusable(&info.digest) {
+                            if touched {
+                                return Ok::<_, Error>(());
+                            }
+                            let chunk_exists = proxmox_async::runtime::block_in_place(|| {
+                                target.cond_touch_chunk(&info.digest, false)
+                            })?;
+                            if chunk_exists {
+                                guard.mark_touched(&info.digest, None);
+                                //info!("chunk {} exists {}", pos, hex::encode(digest));
+                                return Ok::<_, Error>(());
+                            }
+                        }
+                        // mark before actually downloading the chunk, so this happens only once
+                        guard.mark_reusable(&info.digest, None);
+                        guard.mark_touched(&info.digest, None);
+                    }
 
-            //info!("sync {} chunk {}", pos, hex::encode(digest));
-            let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+                    let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+                    (chunk, info.digest, info.size())
+                }
+            };
             let raw_size = chunk.raw_size() as usize;
 
             // decode, verify and write in a separate threads to maximize throughput
             proxmox_async::runtime::block_in_place(|| {
-                verify_and_write_channel.send((chunk, info.digest, info.size()))
+                verify_and_write_channel.send((chunk, digest, size))
             })?;
 
             bytes.fetch_add(raw_size, Ordering::SeqCst);
-- 
2.47.3





  parent reply	other threads:[~2026-04-10 16:55 UTC|newest]

Thread overview: 28+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-10 16:54 [PATCH proxmox{,-backup} v2 00/27] fix #7251: implement server side encryption support for push sync jobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox v2 01/27] pbs-api-types: define en-/decryption key type and schema Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox v2 02/27] pbs-api-types: sync job: add optional cryptographic keys to config Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 03/27] datastore: blob: implement async reader for data blobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 04/27] datastore: manifest: add helper for change detection fingerprint Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 05/27] pbs-key-config: introduce store_with() for KeyConfig Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 06/27] pbs-config: implement encryption key config handling Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 07/27] pbs-config: acls: add 'encryption-keys' as valid 'system' subpath Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 08/27] ui: expose 'encryption-keys' as acl subpath for 'system' Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 09/27] sync: add helper to check encryption key acls and load key Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 10/27] api: config: add endpoints for encryption key manipulation Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 11/27] api: config: check sync owner has access to en-/decryption keys Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 12/27] api: config: allow encryption key manipulation for sync job Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 13/27] sync: push: rewrite manifest instead of pushing pre-existing one Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 14/27] api: push sync: expose optional encryption key for push sync Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 15/27] sync: push: optionally encrypt data blob on upload Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 16/27] sync: push: optionally encrypt client log on upload if key is given Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 17/27] sync: push: add helper for loading known chunks from previous snapshot Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 18/27] fix #7251: api: push: encrypt snapshots using configured encryption key Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 19/27] ui: define and expose encryption key management menu item and windows Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 20/27] ui: expose assigning encryption key to sync jobs Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 21/27] sync: pull: load encryption key if given in job config Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 22/27] sync: expand source chunk reader trait by crypt config Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 23/27] sync: pull: introduce and use decrypt index writer if " Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 24/27] sync: pull: extend encountered chunk by optional decrypted digest Christian Ebner
2026-04-10 16:54 ` [PATCH proxmox-backup v2 25/27] sync: pull: decrypt blob files on pull if encryption key is configured Christian Ebner
2026-04-10 16:54 ` Christian Ebner [this message]
2026-04-10 16:54 ` [PATCH proxmox-backup v2 27/27] sync: pull: decrypt snapshots with matching encryption key fingerprint Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260410165454.1578501-27-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal