public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v5 proxmox-backup 02/31] client: backup writer: factor out merged chunk stream upload
Date: Fri, 18 Oct 2024 10:42:13 +0200	[thread overview]
Message-ID: <20241018084242.144010-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241018084242.144010-1-c.ebner@proxmox.com>

In preparation for implementing push support for sync jobs.

Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 4:
- Rebased onto current master

changes since version 3:
- adapted to refactored upload stat counters

 pbs-client/src/backup_writer.rs | 88 +++++++++++++++++++++------------
 1 file changed, 56 insertions(+), 32 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index f08a65153..1ec181f99 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
 use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
@@ -648,42 +649,14 @@ impl BackupWriter {
         archive: &str,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let mut counters = UploadCounters::new();
-        let total_stream_len = counters.total_stream_len_counter();
-        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
         let counters_readonly = counters.clone();
 
-        let append_chunk_path = format!("{}_index", prefix);
-        let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
-        let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
-
-        let start_time = std::time::Instant::now();
-
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
-        let progress_handle = if archive.ends_with(".img")
-            || archive.ends_with(".pxar")
-            || archive.ends_with(".ppxar")
-        {
-            Some(tokio::spawn(async move {
-                loop {
-                    tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-
-                    let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
-                    let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
-                    let elapsed = TimeSpan::from(start_time.elapsed());
-
-                    log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
-                }
-            }))
-        } else {
-            None
-        };
-
-        stream
+        let stream = stream
             .inject_reused_chunks(injections, counters.total_stream_len_counter())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
@@ -753,7 +726,58 @@ impl BackupWriter {
                     }
                 }
             })
-            .merge_known_chunks()
+            .merge_known_chunks();
+
+        Self::upload_merged_chunk_stream(
+            h2,
+            wid,
+            archive,
+            prefix,
+            stream,
+            index_csum_2,
+            counters_readonly,
+        )
+    }
+
+    fn upload_merged_chunk_stream(
+        h2: H2Client,
+        wid: u64,
+        archive: &str,
+        prefix: &str,
+        stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+        index_csum: Arc<Mutex<Option<Sha256>>>,
+        counters: UploadCounters,
+    ) -> impl Future<Output = Result<UploadStats, Error>> {
+        let append_chunk_path = format!("{prefix}_index");
+        let upload_chunk_path = format!("{prefix}_chunk");
+
+        let start_time = std::time::Instant::now();
+        let total_stream_len = counters.total_stream_len_counter();
+        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+
+        let (upload_queue, upload_result) =
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
+
+        let progress_handle = if archive.ends_with(".img")
+            || archive.ends_with(".pxar")
+            || archive.ends_with(".ppxar")
+        {
+            Some(tokio::spawn(async move {
+                loop {
+                    tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+                    let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
+                    let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
+                    let elapsed = TimeSpan::from(start_time.elapsed());
+
+                    log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
+                }
+            }))
+        } else {
+            None
+        };
+
+        stream
             .try_for_each(move |merged_chunk_info| {
                 let upload_queue = upload_queue.clone();
 
@@ -817,14 +841,14 @@ impl BackupWriter {
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let mut guard = index_csum_2.lock().unwrap();
+                let mut guard = index_csum.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
                 if let Some(handle) = progress_handle {
                     handle.abort();
                 }
 
-                futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
+                futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed()))
             })
     }
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-10-18  8:42 UTC|newest]

Thread overview: 68+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-10-18  8:42 [pbs-devel] [PATCH v5 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 01/31] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-25 10:20   ` Fabian Grünbichler
2024-10-18  8:42 ` Christian Ebner [this message]
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 03/31] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 04/31] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 05/31] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 06/31] api types: implement remote acl path method for sync job Christian Ebner
2024-10-25 11:44   ` Fabian Grünbichler
2024-10-25 12:46     ` Christian Ebner
2024-10-28 11:04       ` Fabian Grünbichler
2024-10-28 15:13         ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 07/31] api types: define remote permissions and roles for push sync Christian Ebner
2024-10-25 10:15   ` Fabian Grünbichler
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 08/31] fix #3044: server: implement push support for sync operations Christian Ebner
2024-10-25 10:10   ` Fabian Grünbichler
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 09/31] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 10/31] api: push: implement endpoint for sync in push direction Christian Ebner
2024-10-25 11:45   ` Fabian Grünbichler
2024-10-30 13:48     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 11/31] api: sync: move sync job invocation to server sync module Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 12/31] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 13/31] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 14/31] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 15/31] api: config: factor out sync job owner check Christian Ebner
2024-10-25 10:16   ` Fabian Grünbichler
2024-10-28 15:17     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 16/31] api: config: extend read access check by sync direction Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 17/31] api: config: extend modify " Christian Ebner
2024-10-25 10:17   ` Fabian Grünbichler
2024-10-25 13:24     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 18/31] bin: manager: add datastore push cli command Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 19/31] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-25 10:32   ` Dominik Csapak
2024-10-28 15:37     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 20/31] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 21/31] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-25 10:39   ` Dominik Csapak
2024-10-28 15:52     ` Christian Ebner
2024-10-29  6:22       ` Dominik Csapak
2024-10-29  7:26         ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 22/31] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 23/31] ui: sync: pass sync-direction to allow removing push jobs Christian Ebner
2024-10-25 10:42   ` Dominik Csapak
2024-10-30 13:23     ` Christian Ebner
2024-10-30 13:33       ` Fabian Grünbichler
2024-10-30 13:50         ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 24/31] ui: sync view: do not use data model proxy for store Christian Ebner
2024-10-25 10:44   ` Dominik Csapak
2024-10-30 13:29     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 25/31] ui: sync view: set sync direction when invoking run task via api Christian Ebner
2024-10-25 10:44   ` Dominik Csapak
2024-10-30 13:30     ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 26/31] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 27/31] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 28/31] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 29/31] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 30/31] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-10-25 10:10   ` Fabian Grünbichler
2024-10-30 13:37     ` Christian Ebner
2024-10-30 13:42       ` Fabian Grünbichler
2024-10-31  9:43         ` Christian Ebner
2024-10-31 12:12           ` Fabian Grünbichler
2024-10-31 12:26             ` Christian Ebner
2024-10-18  8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 31/31] server: sync job: use delete stats provided by the api Christian Ebner
2024-10-25 10:17   ` Fabian Grünbichler
2024-10-30 13:44     ` Christian Ebner
2024-10-31 12:20 ` [pbs-devel] [PATCH v5 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241018084242.144010-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal