public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [PATCH proxmox-backup 19/20] sync: pull: decrypt chunks and rewrite index file for matching key
Date: Wed,  1 Apr 2026 09:55:20 +0200	[thread overview]
Message-ID: <20260401075521.176354-20-c.ebner@proxmox.com> (raw)
In-Reply-To: <20260401075521.176354-1-c.ebner@proxmox.com>

Once the matching decryptioin key will be provided, use it to decrypt
the chunks on pull and rewrite the index file based on the decrypted
chunk digests and offsets.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 src/server/pull.rs | 135 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 114 insertions(+), 21 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index ccf349c92..05152d0dd 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -3,7 +3,7 @@
 use std::collections::hash_map::Entry;
 use std::collections::{HashMap, HashSet};
 use std::io::{BufReader, Read, Seek, Write};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
@@ -20,7 +20,7 @@ use pbs_api_types::{
 };
 use pbs_client::BackupRepository;
 use pbs_config::CachedUserInfo;
-use pbs_datastore::data_blob::DataBlob;
+use pbs_datastore::data_blob::{DataBlob, DataChunkBuilder};
 use pbs_datastore::dynamic_index::{DynamicIndexReader, DynamicIndexWriter};
 use pbs_datastore::fixed_index::{FixedIndexReader, FixedIndexWriter};
 use pbs_datastore::index::IndexFile;
@@ -169,7 +169,16 @@ async fn pull_index_chunks<I: IndexFile>(
             .filter(|info| {
                 let guard = encountered_chunks.lock().unwrap();
                 match guard.check_reusable(&info.digest) {
-                    Some((touched, _decrypted_chunk)) => !touched, // reusable and already touched, can always skip
+                    Some((touched, mapped_digest)) => {
+                        if mapped_digest.is_some() {
+                            // if there is a mapping, then the chunk digest must be rewritten to
+                            // the index, cannot skip here but optimized when processing the stream
+                            true
+                        } else {
+                            // reusable and already touched, can always skip
+                            !touched
+                        }
+                    }
                     None => true,
                 }
             }),
@@ -191,6 +200,7 @@ async fn pull_index_chunks<I: IndexFile>(
     let verify_and_write_channel = verify_pool.channel();
 
     let bytes = Arc::new(AtomicUsize::new(0));
+    let offset = Arc::new(AtomicU64::new(0));
     let chunk_count = Arc::new(AtomicUsize::new(0));
 
     let stream = stream.map(|info| {
@@ -200,36 +210,119 @@ async fn pull_index_chunks<I: IndexFile>(
         let chunk_count = Arc::clone(&chunk_count);
         let verify_and_write_channel = verify_and_write_channel.clone();
         let encountered_chunks = Arc::clone(&encountered_chunks);
+        let offset = Arc::clone(&offset);
+        let decrypted_index_writer = decrypted_index_writer.clone();
 
         Ok::<_, Error>(async move {
-            {
-                // limit guard scope
-                let mut guard = encountered_chunks.lock().unwrap();
-                if let Some((touched, _decrypted_digest)) = guard.check_reusable(&info.digest) {
-                    if touched {
+            //info!("sync {} chunk {}", pos, hex::encode(digest));
+            let (chunk, digest, size) = match decrypted_index_writer {
+                DecryptedIndexWriter::Fixed(index) => {
+                    if let Some((_touched, Some(decrypted_digest))) = encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .check_reusable(&info.digest)
+                    {
+                        // already got the decrypted digest and chunk has been written,
+                        // no need to process again
+                        let size = info.size();
+                        let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+
+                        index.lock().unwrap().add_chunk(
+                            start_offset,
+                            size as u32,
+                            decrypted_digest,
+                        )?;
+
                         return Ok::<_, Error>(());
                     }
-                    let chunk_exists = proxmox_async::runtime::block_in_place(|| {
-                        target.cond_touch_chunk(&info.digest, false)
-                    })?;
-                    if chunk_exists {
-                        guard.mark_touched(&info.digest, None);
-                        //info!("chunk {} exists {}", pos, hex::encode(digest));
+
+                    let chunk_data = chunk_reader.read_chunk(&info.digest).await?;
+                    let (chunk, digest) =
+                        DataChunkBuilder::new(&chunk_data).compress(true).build()?;
+
+                    let size = chunk_data.len() as u64;
+                    let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+
+                    index
+                        .lock()
+                        .unwrap()
+                        .add_chunk(start_offset, size as u32, &digest)?;
+
+                    encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .mark_reusable(&info.digest, Some(digest));
+
+                    (chunk, digest, size)
+                }
+                DecryptedIndexWriter::Dynamic(index) => {
+                    if let Some((_touched, Some(decrypted_digest))) = encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .check_reusable(&info.digest)
+                    {
+                        // already got the decrypted digest and chunk has been written,
+                        // no need to process again
+                        let size = info.size();
+                        let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+                        let end_offset = start_offset + size;
+
+                        index
+                            .lock()
+                            .unwrap()
+                            .add_chunk(end_offset, decrypted_digest)?;
+
                         return Ok::<_, Error>(());
                     }
+
+                    let chunk_data = chunk_reader.read_chunk(&info.digest).await?;
+                    let (chunk, digest) =
+                        DataChunkBuilder::new(&chunk_data).compress(true).build()?;
+
+                    let size = chunk_data.len() as u64;
+                    let start_offset = offset.fetch_add(size, Ordering::SeqCst);
+                    let end_offset = start_offset + size;
+
+                    index.lock().unwrap().add_chunk(end_offset, &digest)?;
+
+                    encountered_chunks
+                        .lock()
+                        .unwrap()
+                        .mark_reusable(&info.digest, Some(digest));
+
+                    (chunk, digest, size)
                 }
-                // mark before actually downloading the chunk, so this happens only once
-                guard.mark_reusable(&info.digest, None);
-                guard.mark_touched(&info.digest, None);
-            }
+                DecryptedIndexWriter::None => {
+                    {
+                        // limit guard scope
+                        let mut guard = encountered_chunks.lock().unwrap();
+                        if let Some((touched, _mapped)) = guard.check_reusable(&info.digest) {
+                            if touched {
+                                return Ok::<_, Error>(());
+                            }
+                            let chunk_exists = proxmox_async::runtime::block_in_place(|| {
+                                target.cond_touch_chunk(&info.digest, false)
+                            })?;
+                            if chunk_exists {
+                                guard.mark_touched(&info.digest, None);
+                                //info!("chunk {} exists {}", pos, hex::encode(digest));
+                                return Ok::<_, Error>(());
+                            }
+                        }
+                        // mark before actually downloading the chunk, so this happens only once
+                        guard.mark_reusable(&info.digest, None);
+                        guard.mark_touched(&info.digest, None);
+                    }
 
-            //info!("sync {} chunk {}", pos, hex::encode(digest));
-            let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+                    let chunk = chunk_reader.read_raw_chunk(&info.digest).await?;
+                    (chunk, info.digest, info.size())
+                }
+            };
             let raw_size = chunk.raw_size() as usize;
 
             // decode, verify and write in a separate threads to maximize throughput
             proxmox_async::runtime::block_in_place(|| {
-                verify_and_write_channel.send((chunk, info.digest, info.size()))
+                verify_and_write_channel.send((chunk, digest, size))
             })?;
 
             bytes.fetch_add(raw_size, Ordering::SeqCst);
-- 
2.47.3





  parent reply	other threads:[~2026-04-01  7:55 UTC|newest]

Thread overview: 32+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-04-01  7:55 [PATCH proxmox{,-backup} 00/20] fix #7251: implement server side encryption support for push sync jobs Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox 01/20] pbs-api-types: define encryption key type and schema Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox 02/20] pbs-api-types: sync job: add optional encryption key to config Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 03/20] pbs-key-config: introduce store_with() for KeyConfig Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 04/20] pbs-config: implement encryption key config handling Christian Ebner
2026-04-01 23:27   ` Thomas Lamprecht
2026-04-02  7:09     ` Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 05/20] pbs-config: acls: add 'encryption-keys' as valid 'system' subpath Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 06/20] ui: expose 'encryption-keys' as acl subpath for 'system' Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 07/20] api: config: add endpoints for encryption key manipulation Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 08/20] api: config: allow encryption key manipulation for sync job Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 09/20] sync: push: rewrite manifest instead of pushing pre-existing one Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 10/20] sync: add helper to check encryption key acls and load key Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 11/20] fix #7251: api: push: encrypt snapshots using configured encryption key Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 12/20] ui: define and expose encryption key management menu item and windows Christian Ebner
2026-04-01 23:09   ` Thomas Lamprecht
2026-04-03  8:35     ` Dominik Csapak
2026-04-01 23:10   ` Thomas Lamprecht
2026-04-03 12:16   ` Dominik Csapak
2026-04-01  7:55 ` [PATCH proxmox-backup 13/20] ui: expose assigning encryption key to sync jobs Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 14/20] sync: pull: load encryption key if given in job config Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 15/20] sync: expand source chunk reader trait by crypt config Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 16/20] sync: pull: introduce and use decrypt index writer if " Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 17/20] sync: pull: extend encountered chunk by optional decrypted digest Christian Ebner
2026-04-01  7:55 ` [PATCH proxmox-backup 18/20] sync: pull: decrypt blob files on pull if encryption key is configured Christian Ebner
2026-04-01  7:55 ` Christian Ebner [this message]
2026-04-01  7:55 ` [PATCH proxmox-backup 20/20] sync: pull: decrypt snapshots with matching encryption key fingerprint Christian Ebner
2026-04-02  0:25 ` [PATCH proxmox{,-backup} 00/20] fix #7251: implement server side encryption support for push sync jobs Thomas Lamprecht
2026-04-02  7:37   ` Christian Ebner
2026-04-03  8:39 ` Dominik Csapak
2026-04-03  8:50   ` Christian Ebner
2026-04-03  9:00     ` Dominik Csapak

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260401075521.176354-20-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal