From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 826FA1FF15E
	for <inbox@lore.proxmox.com>; Fri, 18 Oct 2024 10:42:24 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 036DB1DDA0;
	Fri, 18 Oct 2024 10:42:57 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Fri, 18 Oct 2024 10:42:14 +0200
Message-Id: <20241018084242.144010-4-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20241018084242.144010-1-c.ebner@proxmox.com>
References: <20241018084242.144010-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.027 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v5 proxmox-backup 03/31] client: backup writer:
 allow push uploading index and chunks
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Add a method `upload_index_chunk_info` to be used for uploading an
existing index and the corresponding chunk stream.
Instead of taking an input stream of raw bytes as the
`upload_stream`, this takes a stream of `MergedChunkInfo` object
provided by the local chunk reader of the sync jobs source.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 4:
- rebased onto current master

changes since version 3:
- known chunks are now handled by the caller (so it can be avoided to
  read them)
- adapt for new upload stat counters

 pbs-client/src/backup_writer.rs | 96 +++++++++++++++++++++++++++++++++
 pbs-client/src/lib.rs           |  1 +
 2 files changed, 97 insertions(+)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 1ec181f99..a09757486 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -266,6 +266,102 @@ impl BackupWriter {
             .await
     }
 
+    /// Upload chunks and index
+    pub async fn upload_index_chunk_info(
+        &self,
+        archive_name: &str,
+        stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+        options: UploadOptions,
+    ) -> Result<BackupStats, Error> {
+        let mut param = json!({ "archive-name": archive_name });
+        let prefix = if let Some(size) = options.fixed_size {
+            param["size"] = size.into();
+            "fixed"
+        } else {
+            "dynamic"
+        };
+
+        if options.encrypt && self.crypt_config.is_none() {
+            bail!("requested encryption without a crypt config");
+        }
+
+        let wid = self
+            .h2
+            .post(&format!("{prefix}_index"), Some(param))
+            .await?
+            .as_u64()
+            .unwrap();
+
+        let mut counters = UploadCounters::new();
+        let counters_readonly = counters.clone();
+
+        let is_fixed_chunk_size = prefix == "fixed";
+
+        let index_csum = Arc::new(Mutex::new(Some(Sha256::new())));
+        let index_csum_2 = index_csum.clone();
+
+        let stream = stream
+            .and_then(move |mut merged_chunk_info| {
+                match merged_chunk_info {
+                    MergedChunkInfo::New(ref chunk_info) => {
+                        counters.inc_total_chunks(1);
+                        let chunk_len = chunk_info.chunk_len;
+                        let offset = counters.inc_total_stream_len(chunk_len as usize);
+                        let end_offset = offset as u64 + chunk_len;
+                        let mut guard = index_csum.lock().unwrap();
+                        let csum = guard.as_mut().unwrap();
+                        if !is_fixed_chunk_size {
+                            csum.update(&end_offset.to_le_bytes());
+                        }
+                        csum.update(&chunk_info.digest);
+                    }
+                    MergedChunkInfo::Known(ref mut known_chunk_list) => {
+                        for (chunk_len, digest) in known_chunk_list {
+                            counters.inc_total_chunks(1);
+                            counters.inc_known_chunks(1);
+                            counters.inc_reused_stream_len(*chunk_len as usize);
+                            let offset = counters.inc_total_stream_len(*chunk_len as usize);
+                            let end_offset = offset as u64 + *chunk_len;
+                            let mut guard = index_csum.lock().unwrap();
+                            let csum = guard.as_mut().unwrap();
+                            if !is_fixed_chunk_size {
+                                csum.update(&end_offset.to_le_bytes());
+                            }
+                            csum.update(digest);
+                            // Replace size with offset, expected by further stream
+                            *chunk_len = offset as u64;
+                        }
+                    }
+                }
+                future::ok(merged_chunk_info)
+            })
+            .merge_known_chunks();
+
+        let upload_stats = Self::upload_merged_chunk_stream(
+            self.h2.clone(),
+            wid,
+            archive_name,
+            prefix,
+            stream,
+            index_csum_2,
+            counters_readonly,
+        )
+        .await?;
+
+        let param = json!({
+            "wid": wid ,
+            "chunk-count": upload_stats.chunk_count,
+            "size": upload_stats.size,
+            "csum": hex::encode(upload_stats.csum),
+        });
+        let _value = self
+            .h2
+            .post(&format!("{prefix}_close"), Some(param))
+            .await?;
+
+        Ok(upload_stats.to_backup_stats())
+    }
+
     pub async fn upload_stream(
         &self,
         archive_name: &str,
diff --git a/pbs-client/src/lib.rs b/pbs-client/src/lib.rs
index b875347bb..4b8e4e4f4 100644
--- a/pbs-client/src/lib.rs
+++ b/pbs-client/src/lib.rs
@@ -9,6 +9,7 @@ pub mod tools;
 
 mod inject_reused_chunks;
 mod merge_known_chunks;
+pub use merge_known_chunks::MergedChunkInfo;
 pub mod pipe_to_stream;
 
 mod http_client;
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel