From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 3E14F1FF16B for ; Thu, 17 Oct 2024 15:27:35 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E920C100EF; Thu, 17 Oct 2024 15:28:09 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 17 Oct 2024 15:26:47 +0200 Message-Id: <20241017132716.385234-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241017132716.385234-1-c.ebner@proxmox.com> References: <20241017132716.385234-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.026 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v4 proxmox 02/31] client: backup writer: factor out merged chunk stream upload X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" In preparation for implementing push support for sync jobs. Factor out the upload stream for merged chunks, which can be reused to upload the local chunks to a remote target datastore during a snapshot sync operation in push direction. Signed-off-by: Christian Ebner --- changes since version 3: - adapted to refactored upload stat counters pbs-client/src/backup_writer.rs | 43 +++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index 5ccfcc9b3..27d1c73b1 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -6,6 +6,7 @@ use std::time::Instant; use anyhow::{bail, format_err, Error}; use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; use futures::stream::{Stream, StreamExt, TryStreamExt}; +use openssl::sha::Sha256; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; @@ -633,19 +634,12 @@ impl BackupWriter { let mut counters = UploadCounters::new(); let counters_readonly = counters.clone(); - let append_chunk_path = format!("{}_index", prefix); - let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; - let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); - - let start_time = std::time::Instant::now(); - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); - stream + let stream = stream .inject_reused_chunks(injections, counters.total_stream_len_counter()) .and_then(move |chunk_info| match chunk_info { InjectedChunksInfo::Known(chunks) => { @@ -715,7 +709,28 @@ impl BackupWriter { } } }) - .merge_known_chunks() + .merge_known_chunks(); + + Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters_readonly) + } + + fn upload_merged_chunk_stream( + h2: H2Client, + wid: u64, + prefix: &str, + stream: impl Stream>, + index_csum: Arc>>, + counters: UploadCounters, + ) -> impl Future> { + let append_chunk_path = format!("{prefix}_index"); + let upload_chunk_path = format!("{prefix}_chunk"); + + let (upload_queue, upload_result) = + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); + + let start_time = std::time::Instant::now(); + + stream .try_for_each(move |merged_chunk_info| { let upload_queue = upload_queue.clone(); @@ -725,10 +740,8 @@ impl BackupWriter { let digest_str = hex::encode(digest); log::trace!( - "upload new chunk {} ({} bytes, offset {})", - digest_str, - chunk_info.chunk_len, - offset + "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})", + chunk_len = chunk_info.chunk_len, ); let chunk_data = chunk_info.chunk.into_inner(); @@ -773,10 +786,10 @@ impl BackupWriter { }) .then(move |result| async move { upload_result.await?.and(result) }.boxed()) .and_then(move |_| { - let mut guard = index_csum_2.lock().unwrap(); + let mut guard = index_csum.lock().unwrap(); let csum = guard.take().unwrap().finish(); - futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed())) + futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed())) }) } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel