public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload
Date: Mon, 15 Jul 2024 12:15:48 +0200	[thread overview]
Message-ID: <20240715101602.274244-11-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com>

In preparation for implementing push support for sync jobs.

Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++-------------
 1 file changed, 29 insertions(+), 18 deletions(-)

diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a67b471a7..6daad9fde 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
 use anyhow::{bail, format_err, Error};
 use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
 use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
 use serde_json::{json, Value};
 use tokio::io::AsyncReadExt;
 use tokio::sync::{mpsc, oneshot};
@@ -675,19 +676,12 @@ impl BackupWriter {
             stream_len: stream_len.clone(),
         };
 
-        let append_chunk_path = format!("{}_index", prefix);
-        let upload_chunk_path = format!("{}_chunk", prefix);
         let is_fixed_chunk_size = prefix == "fixed";
 
-        let (upload_queue, upload_result) =
-            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
-
-        let start_time = std::time::Instant::now();
-
         let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
         let index_csum_2 = index_csum.clone();
 
-        stream
+        let stream = stream
             .inject_reused_chunks(injections, stream_len.clone())
             .and_then(move |chunk_info| match chunk_info {
                 InjectedChunksInfo::Known(chunks) => {
@@ -758,7 +752,28 @@ impl BackupWriter {
                     }
                 }
             })
-            .merge_known_chunks()
+            .merge_known_chunks();
+
+        Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters)
+    }
+
+    fn upload_merged_chunk_stream(
+        h2: H2Client,
+        wid: u64,
+        prefix: &str,
+        stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+        index_csum: Arc<Mutex<Option<Sha256>>>,
+        counters: UploadStatsCounters,
+    ) -> impl Future<Output = Result<UploadStats, Error>> {
+        let append_chunk_path = format!("{prefix}_index");
+        let upload_chunk_path = format!("{prefix}_chunk");
+
+        let (upload_queue, upload_result) =
+            Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+
+        let start_time = std::time::Instant::now();
+
+        stream
             .try_for_each(move |merged_chunk_info| {
                 let upload_queue = upload_queue.clone();
 
@@ -768,10 +783,8 @@ impl BackupWriter {
                     let digest_str = hex::encode(digest);
 
                     log::trace!(
-                        "upload new chunk {} ({} bytes, offset {})",
-                        digest_str,
-                        chunk_info.chunk_len,
-                        offset
+                        "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})",
+                        chunk_len = chunk_info.chunk_len,
                     );
 
                     let chunk_data = chunk_info.chunk.into_inner();
@@ -800,9 +813,7 @@ impl BackupWriter {
                             upload_queue
                                 .send((new_info, Some(response)))
                                 .await
-                                .map_err(|err| {
-                                    format_err!("failed to send to upload queue: {}", err)
-                                })
+                                .map_err(|err| format_err!("failed to send to upload queue: {err}"))
                         },
                     ))
                 } else {
@@ -810,13 +821,13 @@ impl BackupWriter {
                         upload_queue
                             .send((merged_chunk_info, None))
                             .await
-                            .map_err(|err| format_err!("failed to send to upload queue: {}", err))
+                            .map_err(|err| format_err!("failed to send to upload queue: {err}"))
                     })
                 }
             })
             .then(move |result| async move { upload_result.await?.and(result) }.boxed())
             .and_then(move |_| {
-                let mut guard = index_csum_2.lock().unwrap();
+                let mut guard = index_csum.lock().unwrap();
                 let csum = guard.take().unwrap().finish();
 
                 futures::future::ok(UploadStats {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-07-15 10:16 UTC|newest]

Thread overview: 36+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
2024-07-16  9:53   ` Gabriel Goller
2024-07-23  7:32     ` Christian Ebner
2024-07-30  8:38       ` Gabriel Goller
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
2024-07-15 10:15 ` Christian Ebner [this message]
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-16 14:28   ` Christian Ebner
2024-07-16 14:51     ` Gabriel Goller
2024-07-16 14:54       ` Christian Ebner
2024-07-23 14:00   ` Christian Ebner
2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18  7:36   ` Christian Ebner
2024-07-30 10:42   ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240715101602.274244-11-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal