From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v5 proxmox-backup 02/31] client: backup writer: factor out merged chunk stream upload
Date: Fri, 18 Oct 2024 10:42:13 +0200 [thread overview]
Message-ID: <20241018084242.144010-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241018084242.144010-1-c.ebner@proxmox.com>
In preparation for implementing push support for sync jobs.
Factor out the upload stream for merged chunks, which can be reused
to upload the local chunks to a remote target datastore during a
snapshot sync operation in push direction.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 4:
- Rebased onto current master
changes since version 3:
- adapted to refactored upload stat counters
pbs-client/src/backup_writer.rs | 88 +++++++++++++++++++++------------
1 file changed, 56 insertions(+), 32 deletions(-)
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index f08a65153..1ec181f99 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -7,6 +7,7 @@ use std::time::Instant;
use anyhow::{bail, format_err, Error};
use futures::future::{self, AbortHandle, Either, FutureExt, TryFutureExt};
use futures::stream::{Stream, StreamExt, TryStreamExt};
+use openssl::sha::Sha256;
use serde_json::{json, Value};
use tokio::io::AsyncReadExt;
use tokio::sync::{mpsc, oneshot};
@@ -648,42 +649,14 @@ impl BackupWriter {
archive: &str,
) -> impl Future<Output = Result<UploadStats, Error>> {
let mut counters = UploadCounters::new();
- let total_stream_len = counters.total_stream_len_counter();
- let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counters_readonly = counters.clone();
- let append_chunk_path = format!("{}_index", prefix);
- let upload_chunk_path = format!("{}_chunk", prefix);
let is_fixed_chunk_size = prefix == "fixed";
- let (upload_queue, upload_result) =
- Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
-
- let start_time = std::time::Instant::now();
-
let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new())));
let index_csum_2 = index_csum.clone();
- let progress_handle = if archive.ends_with(".img")
- || archive.ends_with(".pxar")
- || archive.ends_with(".ppxar")
- {
- Some(tokio::spawn(async move {
- loop {
- tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
-
- let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
- let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
- let elapsed = TimeSpan::from(start_time.elapsed());
-
- log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
- }
- }))
- } else {
- None
- };
-
- stream
+ let stream = stream
.inject_reused_chunks(injections, counters.total_stream_len_counter())
.and_then(move |chunk_info| match chunk_info {
InjectedChunksInfo::Known(chunks) => {
@@ -753,7 +726,58 @@ impl BackupWriter {
}
}
})
- .merge_known_chunks()
+ .merge_known_chunks();
+
+ Self::upload_merged_chunk_stream(
+ h2,
+ wid,
+ archive,
+ prefix,
+ stream,
+ index_csum_2,
+ counters_readonly,
+ )
+ }
+
+ fn upload_merged_chunk_stream(
+ h2: H2Client,
+ wid: u64,
+ archive: &str,
+ prefix: &str,
+ stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
+ index_csum: Arc<Mutex<Option<Sha256>>>,
+ counters: UploadCounters,
+ ) -> impl Future<Output = Result<UploadStats, Error>> {
+ let append_chunk_path = format!("{prefix}_index");
+ let upload_chunk_path = format!("{prefix}_chunk");
+
+ let start_time = std::time::Instant::now();
+ let total_stream_len = counters.total_stream_len_counter();
+ let uploaded_len = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+
+ let (upload_queue, upload_result) =
+ Self::append_chunk_queue(h2.clone(), wid, append_chunk_path, uploaded_len.clone());
+
+ let progress_handle = if archive.ends_with(".img")
+ || archive.ends_with(".pxar")
+ || archive.ends_with(".ppxar")
+ {
+ Some(tokio::spawn(async move {
+ loop {
+ tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
+
+ let size = HumanByte::from(total_stream_len.load(Ordering::SeqCst));
+ let size_uploaded = HumanByte::from(uploaded_len.load(Ordering::SeqCst));
+ let elapsed = TimeSpan::from(start_time.elapsed());
+
+ log::info!("processed {size} in {elapsed}, uploaded {size_uploaded}");
+ }
+ }))
+ } else {
+ None
+ };
+
+ stream
.try_for_each(move |merged_chunk_info| {
let upload_queue = upload_queue.clone();
@@ -817,14 +841,14 @@ impl BackupWriter {
})
.then(move |result| async move { upload_result.await?.and(result) }.boxed())
.and_then(move |_| {
- let mut guard = index_csum_2.lock().unwrap();
+ let mut guard = index_csum.lock().unwrap();
let csum = guard.take().unwrap().finish();
if let Some(handle) = progress_handle {
handle.abort();
}
- futures::future::ok(counters_readonly.to_upload_stats(csum, start_time.elapsed()))
+ futures::future::ok(counters.to_upload_stats(csum, start_time.elapsed()))
})
}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-18 8:42 UTC|newest]
Thread overview: 68+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-18 8:42 [pbs-devel] [PATCH v5 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 01/31] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-25 10:20 ` Fabian Grünbichler
2024-10-18 8:42 ` Christian Ebner [this message]
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 03/31] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 04/31] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 05/31] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 06/31] api types: implement remote acl path method for sync job Christian Ebner
2024-10-25 11:44 ` Fabian Grünbichler
2024-10-25 12:46 ` Christian Ebner
2024-10-28 11:04 ` Fabian Grünbichler
2024-10-28 15:13 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 07/31] api types: define remote permissions and roles for push sync Christian Ebner
2024-10-25 10:15 ` Fabian Grünbichler
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 08/31] fix #3044: server: implement push support for sync operations Christian Ebner
2024-10-25 10:10 ` Fabian Grünbichler
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 09/31] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 10/31] api: push: implement endpoint for sync in push direction Christian Ebner
2024-10-25 11:45 ` Fabian Grünbichler
2024-10-30 13:48 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 11/31] api: sync: move sync job invocation to server sync module Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 12/31] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 13/31] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 14/31] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 15/31] api: config: factor out sync job owner check Christian Ebner
2024-10-25 10:16 ` Fabian Grünbichler
2024-10-28 15:17 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 16/31] api: config: extend read access check by sync direction Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 17/31] api: config: extend modify " Christian Ebner
2024-10-25 10:17 ` Fabian Grünbichler
2024-10-25 13:24 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 18/31] bin: manager: add datastore push cli command Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 19/31] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-25 10:32 ` Dominik Csapak
2024-10-28 15:37 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 20/31] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 21/31] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-25 10:39 ` Dominik Csapak
2024-10-28 15:52 ` Christian Ebner
2024-10-29 6:22 ` Dominik Csapak
2024-10-29 7:26 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 22/31] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 23/31] ui: sync: pass sync-direction to allow removing push jobs Christian Ebner
2024-10-25 10:42 ` Dominik Csapak
2024-10-30 13:23 ` Christian Ebner
2024-10-30 13:33 ` Fabian Grünbichler
2024-10-30 13:50 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 24/31] ui: sync view: do not use data model proxy for store Christian Ebner
2024-10-25 10:44 ` Dominik Csapak
2024-10-30 13:29 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 25/31] ui: sync view: set sync direction when invoking run task via api Christian Ebner
2024-10-25 10:44 ` Dominik Csapak
2024-10-30 13:30 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 26/31] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 27/31] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 28/31] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 29/31] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 30/31] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-10-25 10:10 ` Fabian Grünbichler
2024-10-30 13:37 ` Christian Ebner
2024-10-30 13:42 ` Fabian Grünbichler
2024-10-31 9:43 ` Christian Ebner
2024-10-31 12:12 ` Fabian Grünbichler
2024-10-31 12:26 ` Christian Ebner
2024-10-18 8:42 ` [pbs-devel] [PATCH v5 proxmox-backup 31/31] server: sync job: use delete stats provided by the api Christian Ebner
2024-10-25 10:17 ` Fabian Grünbichler
2024-10-30 13:44 ` Christian Ebner
2024-10-31 12:20 ` [pbs-devel] [PATCH v5 proxmox-backup 00/31] fix #3044: push datastore to remote target Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241018084242.144010-3-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal