From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 2CD921FF16B for ; Thu, 31 Oct 2024 13:16:21 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id A31BF84CC; Thu, 31 Oct 2024 13:16:23 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 31 Oct 2024 13:14:52 +0100 Message-Id: <20241031121519.434337-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241031121519.434337-1-c.ebner@proxmox.com> References: <20241031121519.434337-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.029 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v6 proxmox-backup 02/29] client: backup writer: factor out merged chunk stream upload X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" In preparation for implementing push support for sync jobs. Factor out the upload stream for merged chunks, which can be reused to upload the local chunks to a remote target datastore during a snapshot sync operation in push direction. Signed-off-by: Christian Ebner --- Changes since version 5: - refactor to adapt to UploadCounter changes pbs-client/src/backup_writer.rs | 88 +++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index 8b9afdb95..f1bad4128 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -7,6 +7,7 @@ use std::time::Instant; use anyhow::{bail, format_err, Error}; use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt}; use futures::stream::{Stream, StreamExt, TryStreamExt}; +use openssl::sha::Sha256; use serde_json::{json, Value}; use tokio::io::AsyncReadExt; use tokio::sync::{mpsc, oneshot}; @@ -648,42 +649,14 @@ impl BackupWriter { archive: &str, ) -> impl Future> { let mut counters = UploadCounters::new(); - let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0)); let counters_readonly = counters.clone(); - let append_chunk_path = format!("{}_index", prefix); - let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; - let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone()); - - let start_time = std::time::Instant::now(); - let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); - let progress_handle = if archive.ends_with(".img") - || archive.ends_with(".pxar") - || archive.ends_with(".ppxar") - { - let counters = counters.clone(); - Some(tokio::spawn(async move { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - - let size = HumanByte::from(counters.total_stream_len()); - let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst)); - let elapsed = TimeSpan::from(start_time.elapsed()); - - log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}"); - } - })) - } else { - None - }; - - stream + let stream = stream .inject_reused_chunks(injections, counters.clone()) .and_then(move |chunk_info| match chunk_info { InjectedChunksInfo::Known(chunks) => { @@ -749,7 +722,58 @@ impl BackupWriter { future::ok(res) } }) - .merge_known_chunks() + .merge_known_chunks(); + + Self::upload_merged_chunk_stream( + h2, + wid, + archive, + prefix, + stream, + index_csum_2, + counters_readonly, + ) + } + + fn upload_merged_chunk_stream( + h2: H2Client, + wid: u64, + archive: &str, + prefix: &str, + stream: impl Stream>, + index_csum: Arc>>, + counters: UploadCounters, + ) -> impl Future> { + let append_chunk_path = format!("{prefix}_index"); + let upload_chunk_path = format!("{prefix}_chunk"); + + let start_time = std::time::Instant::now(); + let uploaded_len = Arc::new(AtomicUsize::new(0)); + + let (upload_queue, upload_result) = + Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone()); + + let progress_handle = if archive.ends_with(".img") + || archive.ends_with(".pxar") + || archive.ends_with(".ppxar") + { + let counters = counters.clone(); + Some(tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + let size = HumanByte::from(counters.total_stream_len()); + let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst)); + let elapsed = TimeSpan::from(start_time.elapsed()); + + log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}"); + } + })) + } else { + None + }; + + stream .try_for_each(move |merged_chunk_info| { let upload_queue = upload_queue.clone(); @@ -813,14 +837,14 @@ impl BackupWriter { }) .then(move |result| async move { upload_result.await?.and(result) }.boxed()) .and_then(move |_| { - let mut guard = index_csum_2.lock().unwrap(); + let mut guard = index_csum.lock().unwrap(); let csum = guard.take().unwrap().finish(); if let Some(handle) = progress_handle { handle.abort(); } - futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed())) + futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed())) }) } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel