From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 1E4A81FF17D
	for <inbox@lore.proxmox.com>; Thu,  1 Aug 2024 09:44:52 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 76ABF3071D;
	Thu,  1 Aug 2024 09:44:55 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Thu,  1 Aug 2024 09:43:38 +0200
Message-Id: <20240801074403.36229-7-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240801074403.36229-1-c.ebner@proxmox.com>
References: <20240801074403.36229-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.021 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v2 proxmox-backup 06/31] client: backup writer:
 factor out merged chunk stream upload
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

In preparation for implementing push support for sync jobs.

Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 1:
- no changes

 pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++-------------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 34ac47beb..dceda1548 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
 use anyhow::{bail, format_err, Error};
 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
 use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
@@ -666,19 +667,12 @@ impl BackupWriter {
             stream_len: stream_len.clone(),
         };
 
-        let append_chunk_path = format!("{}_index", prefix);
-        let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
-        let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
-
-        let start_time = std::time::Instant::now();
-
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
-        stream
+        let stream = stream
             .inject_reused_chunks(injections, stream_len.clone())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
@@ -749,7 +743,28 @@ impl BackupWriter {
                     }
                 }
             })
-            .merge_known_chunks()
+            .merge_known_chunks();
+
+        Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters)
+    }
+
+    fn upload_merged_chunk_stream(
+        h2: H2Client,
+        wid: u64,
+        prefix: &str,
+        stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+        index_csum: Arc<Mutex<Option<Sha256>>>,
+        counters: UploadStatsCounters,
+    ) -> impl Future<Output = Result<UploadStats, Error>> {
+        let append_chunk_path = format!("{prefix}_index");
+        let upload_chunk_path = format!("{prefix}_chunk");
+
+        let (upload_queue, upload_result) =
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+
+        let start_time = std::time::Instant::now();
+
+        stream
             .try_for_each(move |merged_chunk_info| {
                 let upload_queue = upload_queue.clone();
 
@@ -759,10 +774,8 @@ impl BackupWriter {
                     let digest_str = hex::encode(digest);
 
                     log::trace!(
-                        "upload new chunk {} ({} bytes, offset {})",
-                        digest_str,
-                        chunk_info.chunk_len,
-                        offset
+                        "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})",
+                        chunk_len = chunk_info.chunk_len,
                     );
 
                     let chunk_data = chunk_info.chunk.into_inner();
@@ -791,9 +804,7 @@ impl BackupWriter {
                             upload_queue
                                 .send((new_info, Some(response)))
                                 .await
-                                .map_err(|err| {
-                                    format_err!("failed to send to upload queue: {}", err)
-                                })
+                                .map_err(|err| format_err!("failed to send to upload queue: {err}"))
                         },
                     ))
                 } else {
@@ -801,13 +812,13 @@ impl BackupWriter {
                         upload_queue
                             .send((merged_chunk_info, None))
                             .await
-                            .map_err(|err| format_err!("failed to send to upload queue: {}", err))
+                            .map_err(|err| format_err!("failed to send to upload queue: {err}"))
                     })
                 }
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let mut guard = index_csum_2.lock().unwrap();
+                let mut guard = index_csum.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
                 futures::future::ok(UploadStats {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel