all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v6 proxmox-backup 02/29] client: backup writer: factor out merged chunk stream upload
Date: Thu, 31 Oct 2024 13:14:52 +0100	[thread overview]
Message-ID: <20241031121519.434337-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241031121519.434337-1-c.ebner@proxmox.com>

In preparation for implementing push support for sync jobs.

Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
Changes since version 5:
- refactor to adapt to UploadCounter changes

 pbs-client/src/backup_writer.rs | 88 +++++++++++++++++++++------------
 1 file changed, 56 insertions(+), 32 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 8b9afdb95..f1bad4128 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::time::Instant;
 use anyhow::{bail, format_err, Error};
 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
 use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
@@ -648,42 +649,14 @@ impl BackupWriter {
         archive: &str,
     ) -> impl Future<Output = Result<UploadStats, Error>> {
         let mut counters = UploadCounters::new();
-        let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
         let counters_readonly = counters.clone();
 
-        let append_chunk_path = format!("{}_index", prefix);
-        let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
-        let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
-
-        let start_time = std::time::Instant::now();
-
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
-        let progress_handle = if archive.ends_with(".img")
-            || archive.ends_with(".pxar")
-            || archive.ends_with(".ppxar")
-        {
-            let counters = counters.clone();
-            Some(tokio::spawn(async move {
-                loop {
-                    tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-
-                    let size = HumanByte::from(counters.total_stream_len());
-                    let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
-                    let elapsed = TimeSpan::from(start_time.elapsed());
-
-                    log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
-                }
-            }))
-        } else {
-            None
-        };
-
-        stream
+        let stream = stream
             .inject_reused_chunks(injections, counters.clone())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
@@ -749,7 +722,58 @@ impl BackupWriter {
                     future::ok(res)
                 }
             })
-            .merge_known_chunks()
+            .merge_known_chunks();
+
+        Self::upload_merged_chunk_stream(
+            h2,
+            wid,
+            archive,
+            prefix,
+            stream,
+            index_csum_2,
+            counters_readonly,
+        )
+    }
+
+    fn upload_merged_chunk_stream(
+        h2: H2Client,
+        wid: u64,
+        archive: &str,
+        prefix: &str,
+        stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+        index_csum: Arc<Mutex<Option<Sha256>>>,
+        counters: UploadCounters,
+    ) -> impl Future<Output = Result<UploadStats, Error>> {
+        let append_chunk_path = format!("{prefix}_index");
+        let upload_chunk_path = format!("{prefix}_chunk");
+
+        let start_time = std::time::Instant::now();
+        let uploaded_len = Arc::new(AtomicUsize::new(0));
+
+        let (upload_queue, upload_result) =
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
+
+        let progress_handle = if archive.ends_with(".img")
+            || archive.ends_with(".pxar")
+            || archive.ends_with(".ppxar")
+        {
+            let counters = counters.clone();
+            Some(tokio::spawn(async move {
+                loop {
+                    tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+                    let size = HumanByte::from(counters.total_stream_len());
+                    let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
+                    let elapsed = TimeSpan::from(start_time.elapsed());
+
+                    log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
+                }
+            }))
+        } else {
+            None
+        };
+
+        stream
             .try_for_each(move |merged_chunk_info| {
                 let upload_queue = upload_queue.clone();
 
@@ -813,14 +837,14 @@ impl BackupWriter {
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let mut guard = index_csum_2.lock().unwrap();
+                let mut guard = index_csum.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
                 if let Some(handle) = progress_handle {
                     handle.abort();
                 }
 
-                futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
+                futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed()))
             })
     }
 
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-10-31 12:16 UTC|newest]

Thread overview: 51+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-10-31 12:14 [pbs-devel] [PATCH v6 proxmox-backup 00/29] fix #3044: push datastore to remote target Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 01/29] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-31 12:14 ` Christian Ebner [this message]
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 03/29] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 04/29] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 05/29] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 06/29] api types: add remote acl path method for `BackupNamespace` Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 07/29] api types: implement remote acl path method for sync job Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 08/29] api types: define remote permissions and roles for push sync Christian Ebner
2024-11-06 11:58   ` Fabian Grünbichler
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 09/29] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 10/29] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 11/29] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 12/29] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-11-06 11:57   ` Fabian Grünbichler
2024-11-20 16:27     ` Thomas Lamprecht
2024-11-20 17:34       ` Christian Ebner
2024-11-21  9:23         ` Thomas Lamprecht
2024-11-21  9:38           ` Fabian Grünbichler
2024-11-21  9:58           ` Christian Ebner
2024-11-21 16:01             ` Thomas Lamprecht
2024-11-21 16:15               ` Christian Ebner
2024-11-22 12:42                 ` Thomas Lamprecht
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 13/29] fix #3044: server: implement push support for sync operations Christian Ebner
2024-11-06 11:57   ` Fabian Grünbichler
2024-11-07  9:27     ` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 14/29] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 15/29] api: push: implement endpoint for sync in push direction Christian Ebner
2024-11-06 15:10   ` Fabian Grünbichler
2024-11-07  9:18     ` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 16/29] api: sync: move sync job invocation to server sync module Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 17/29] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 18/29] api: config: factor out sync job owner check Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 19/29] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-11-06 15:20   ` Fabian Grünbichler
2024-11-07  9:10     ` Christian Ebner
2024-11-07  9:40       ` Fabian Grünbichler
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 20/29] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 21/29] bin: manager: add datastore push cli command Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 22/29] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 23/29] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 24/29] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 25/29] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 26/29] ui: sync view: set proxy on view instead of model Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 27/29] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-11-21  9:27   ` Thomas Lamprecht
2024-11-21 10:00     ` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 28/29] api: version: add 'prune-delete-stats' as supported feature Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 29/29] docs: add section for sync jobs in push direction Christian Ebner
2024-11-21 16:05   ` Maximiliano Sandoval
2024-11-11 15:46 ` [pbs-devel] [PATCH v6 proxmox-backup 00/29] fix #3044: push datastore to remote target Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241031121519.434337-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal