From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 10/24] client: backup writer: factor out merged chunk stream upload
Date: Mon, 15 Jul 2024 12:15:48 +0200 [thread overview]
Message-ID: <20240715101602.274244-11-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com>
In preparation for implementing push support for sync jobs.
Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_writer.rs | 47 ++++++++++++++++++++-------------
1 file changed, 29 insertions(+), 18 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index a67b471a7..6daad9fde 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::sync::{Arc, Mutex};
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@@ -675,19 +676,12 @@ impl BackupWriter {
stream_len: stream_len.clone(),
};
- let append_chunk_path = format!("{}_index", prefix);
- let upload_chunk_path = format!("{}_chunk", prefix);
let is_fixed_chunk_size = prefix == "fixed";
- let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
-
- let start_time = std::time::Instant::now();
-
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
let index_csum_2 = index_csum.clone();
- stream
+ let stream = stream
.inject_reused_chunks(injections, stream_len.clone())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
@@ -758,7 +752,28 @@ impl BackupWriter {
}
}
})
- .merge_known_chunks()
+ .merge_known_chunks();
+
+ Self::upload_merged_chunk_stream(h2, wid, prefix, stream, index_csum_2, counters)
+ }
+
+ fn upload_merged_chunk_stream(
+ h2: H2Client,
+ wid: u64,
+ prefix: &str,
+ stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+ index_csum: Arc<Mutex<Option<Sha256>>>,
+ counters: UploadStatsCounters,
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
+ let append_chunk_path = format!("{prefix}_index");
+ let upload_chunk_path = format!("{prefix}_chunk");
+
+ let (upload_queue, upload_result) =
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path);
+
+ let start_time = std::time::Instant::now();
+
+ stream
.try_for_each(move |merged_chunk_info| {
let upload_queue = upload_queue.clone();
@@ -768,10 +783,8 @@ impl BackupWriter {
let digest_str = hex::encode(digest);
log::trace!(
- "upload new chunk {} ({} bytes, offset {})",
- digest_str,
- chunk_info.chunk_len,
- offset
+ "upload new chunk {digest_str} ({chunk_len} bytes, offset {offset})",
+ chunk_len = chunk_info.chunk_len,
);
let chunk_data = chunk_info.chunk.into_inner();
@@ -800,9 +813,7 @@ impl BackupWriter {
upload_queue
.send((new_info, Some(response)))
.await
- .map_err(|err| {
- format_err!("failed to send to upload queue: {}", err)
- })
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
},
))
} else {
@@ -810,13 +821,13 @@ impl BackupWriter {
upload_queue
.send((merged_chunk_info, None))
.await
- .map_err(|err| format_err!("failed to send to upload queue: {}", err))
+ .map_err(|err| format_err!("failed to send to upload queue: {err}"))
})
}
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let mut guard = index_csum_2.lock().unwrap();
+ let mut guard = index_csum.lock().unwrap();
let csum = guard.take().unwrap().finish();
futures::future::ok(UploadStats {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-07-15 10:16 UTC|newest]
Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-07-15 10:15 [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 01/24] datastore: data blob: fix typos in comments Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 02/24] server: pull: be more specific in module comment Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 03/24] server: pull: silence clippy to many arguments warning Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 04/24] www: sync edit: indetation style fix Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 05/24] server: pull: fix sync info message for root namespace Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 06/24] server: sync: move sync related stats to common module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 07/24] server: sync: move reader trait to common sync module Christian Ebner
2024-07-16 9:53 ` Gabriel Goller
2024-07-23 7:32 ` Christian Ebner
2024-07-30 8:38 ` Gabriel Goller
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source " Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 09/24] client: backup writer: bundle upload stats counters Christian Ebner
2024-07-15 10:15 ` Christian Ebner [this message]
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 11/24] client: backup writer: add chunk count and duration stats Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 12/24] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 13/24] api: backup: add ignore-previous flag to backup endpoint Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 14/24] server: sync: move skip info/reason to common sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 15/24] server: sync: make skip reason message more genenric Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 16/24] server: sync: factor out namespace depth check into sync module Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 17/24] api types: define remote permissions and roles for push sync Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 18/24] fix #3044: server: implement push support for sync operations Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 19/24] api: config: extend sync job config by sync direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 20/24] api: push: implement endpoint for sync in push direction Christian Ebner
2024-07-15 10:15 ` [pbs-devel] [RFC proxmox-backup 21/24] api: sync: move sync job invocation to common module Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 22/24] bin: manager: add datastore push cli command Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 23/24] form: group filter: allow to set namespace for local datastore Christian Ebner
2024-07-15 10:16 ` [pbs-devel] [RFC proxmox-backup 24/24] www: sync edit: allow to set sync direction for sync jobs Christian Ebner
2024-07-16 14:09 ` [pbs-devel] [RFC proxmox-backup 00/24] fix #3044: push datastore to remote Gabriel Goller
2024-07-16 14:28 ` Christian Ebner
2024-07-16 14:51 ` Gabriel Goller
2024-07-16 14:54 ` Christian Ebner
2024-07-23 14:00 ` Christian Ebner
2024-07-17 15:48 ` Thomas Lamprecht
2024-07-18 7:36 ` Christian Ebner
2024-07-30 10:42 ` Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240715101602.274244-11-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.