From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id D8DFD1FF2D5 for ; Mon, 15 Jul 2024 12:16:31 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B1DD037EBE; Mon, 15 Jul 2024 12:16:55 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Mon, 15 Jul 2024 12:15:48 +0200 Message-Id: <20240715101602.274244-11-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com> References: <20240715101602.274244-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" In preparation for implementing push support for sync jobs. Factor out the upload stream for merged chunks, which can be reused to upload the local chunks to a remote target datastore during a snapshot sync operation in push direction. Signed-off-by: Christian Ebner --- pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index a67b471a7..6daad9fde 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; use futures::stream::{Stream, StreamExt, TryStreamExt}; +use openssl::sha::Sha256; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; @@ -675,19 +676,12 @@ impl BackupWriter { stream_len: stream_len.clone(), }; - let append_chunk_path = format!("{}_index", prefix); - let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; - let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); - - let start_time = std::time::Instant::now(); - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); - stream + let stream = stream .inject_reused_chunks(injections, stream_len.clone()) .and_then(move |chunk_info| match chunk_info { InjectedChunksInfo::Known(chunks) => { @@ -758,7 +752,28 @@ impl BackupWriter { } } }) - .merge_known_chunks() + .merge_known_chunks(); + + Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters) + } + + fn upload_merged_chunk_stream( + h2: H2Client, + wid: u64, + prefix: &str, + stream: impl Stream>, + index_csum: Arc>>, + counters: UploadStatsCounters, + ) -> impl Future> { + let append_chunk_path = format!("{prefix}_index"); + let upload_chunk_path = format!("{prefix}_chunk"); + + let (upload_queue, upload_result) = + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); + + let start_time = std::time::Instant::now(); + + stream .try_for_each(move |merged_chunk_info| { let upload_queue = upload_queue.clone(); @@ -768,10 +783,8 @@ impl BackupWriter { let digest_str = hex::encode(digest); log::trace!( - "upload new chunk {} ({} bytes, offset {})", - digest_str, - chunk_info.chunk_len, - offset + "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})", + chunk_len = chunk_info.chunk_len, ); let chunk_data = chunk_info.chunk.into_inner(); @@ -800,9 +813,7 @@ impl BackupWriter { upload_queue .send((new_info, Some(response))) .await - .map_err(|err| { - format_err!("failed to send to upload queue: {}", err) - }) + .map_err(|err| format_err!("failed to send to upload queue: {err}")) }, )) } else { @@ -810,13 +821,13 @@ impl BackupWriter { upload_queue .send((merged_chunk_info, None)) .await - .map_err(|err| format_err!("failed to send to upload queue: {}", err)) + .map_err(|err| format_err!("failed to send to upload queue: {err}")) }) } }) .then(move |result| async move { upload_result.await?.and(result) }.boxed()) .and_then(move |_| { - let mut guard = index_csum_2.lock().unwrap(); + let mut guard = index_csum.lock().unwrap(); let csum = guard.take().unwrap().finish(); futures::future::ok(UploadStats { -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel