From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 5C7791FF164 for ; Wed, 9 Oct 2024 11:20:21 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 13BB53469C; Wed, 9 Oct 2024 11:20:47 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Wed, 9 Oct 2024 11:20:31 +0200 Message-Id: <20241009092031.97601-1-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.023 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [proxmox.com] Subject: [pbs-devel] [PATCH v2 proxmox-backup] partial fix #5560: client: periodically show backup progress X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Spawn a new tokio task which about every minute displays the cumulative progress of the backup for pxar, ppxar or img archive streams. Catalog and metadata archive streams are excluded from the output for better readability, and because the catalog upload lives for the whole upload time, leading to possible temporal misalignments in the output. The actual payload data is written via the other streams anyway. Add accounting for uploaded chunks, to distinguish from chunks queued for upload, but not actually uploaded yet. Example output in the backup task log: ``` ... INFO: root.pxar: elapsed 60.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 13.021 MiB (compressed 5.327 MiB, average: 222.221 KiB/s) INFO: root.pxar: elapsed 120.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 27.068 MiB (compressed 11.583 MiB, average: 230.977 KiB/s) INFO: root.pxar: elapsed 180.00 s, new: 191.446 MiB, reused: 0 B, total: 191.446 MiB, uploaded: 36.138 MiB (compressed 14.987 MiB, average: 205.58 KiB/s) ... ``` This partially fixes issue 5560: https://bugzilla.proxmox.com/show_bug.cgi?id=5560 Signed-off-by: Christian Ebner --- Changes since version 1, thanks Gabriel for comments and testing: - Abort progress output task when upload stream is finished - Limit output to pxar, ppxar or img archives for cleaner output - Adapted commit title and message pbs-client/src/backup_writer.rs | 98 +++++++++++++++++++++++++++++---- 1 file changed, 86 insertions(+), 12 deletions(-) diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs index d63c09b5a..ae9e8983a 100644 --- a/pbs-client/src/backup_writer.rs +++ b/pbs-client/src/backup_writer.rs @@ -65,7 +65,13 @@ struct UploadStats { csum: [u8; 32], } -type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; +struct ChunkUploadResponse { + future: h2::client::ResponseFuture, + size: usize, + size_encoded: usize, +} + +type UploadQueueSender = mpsc::Sender<(MergedChunkInfo, Option)>; type UploadResultReceiver = oneshot::Receiver>; impl BackupWriter { @@ -332,6 +338,12 @@ impl BackupWriter { .as_u64() .unwrap(); + let archive = if log::log_enabled!(log::Level::Debug) { + archive_name + } else { + pbs_tools::format::strip_server_file_extension(archive_name) + }; + let upload_stats = Self::upload_chunk_info_stream( self.h2.clone(), wid, @@ -345,16 +357,12 @@ impl BackupWriter { }, options.compress, injections, + archive, ) .await?; let size_dirty = upload_stats.size - upload_stats.size_reused; let size: HumanByte = upload_stats.size.into(); - let archive = if log::log_enabled!(log::Level::Debug) { - archive_name - } else { - pbs_tools::format::strip_server_file_extension(archive_name) - }; if upload_stats.chunk_injected > 0 { log::info!( @@ -462,6 +470,8 @@ impl BackupWriter { h2: H2Client, wid: u64, path: String, + uploaded: Arc, + uploaded_compressed: Arc, ) -> (UploadQueueSender, UploadResultReceiver) { let (verify_queue_tx, verify_queue_rx) = mpsc::channel(64); let (verify_result_tx, verify_result_rx) = oneshot::channel(); @@ -470,15 +480,23 @@ impl BackupWriter { tokio::spawn( ReceiverStream::new(verify_queue_rx) .map(Ok::<_, Error>) - .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { + .and_then(move |(merged_chunk_info, response): (MergedChunkInfo, Option)| { match (response, merged_chunk_info) { (Some(response), MergedChunkInfo::Known(list)) => { Either::Left( response + .future .map_err(Error::from) .and_then(H2Client::h2api_response) - .and_then(move |_result| { - future::ok(MergedChunkInfo::Known(list)) + .and_then({ + let uploaded = uploaded.clone(); + let uploaded_compressed = uploaded_compressed.clone(); + move |_result| { + // account for uploaded bytes for progress output + uploaded.fetch_add(response.size, Ordering::SeqCst); + uploaded_compressed.fetch_add(response.size_encoded, Ordering::SeqCst); + future::ok(MergedChunkInfo::Known(list)) + } }) ) } @@ -636,6 +654,7 @@ impl BackupWriter { crypt_config: Option>, compress: bool, injections: Option>, + archive: &str, ) -> impl Future> { let total_chunks = Arc::new(AtomicUsize::new(0)); let total_chunks2 = total_chunks.clone(); @@ -646,25 +665,67 @@ impl BackupWriter { let stream_len = Arc::new(AtomicUsize::new(0)); let stream_len2 = stream_len.clone(); + let stream_len3 = stream_len.clone(); let compressed_stream_len = Arc::new(AtomicU64::new(0)); let compressed_stream_len2 = compressed_stream_len.clone(); let reused_len = Arc::new(AtomicUsize::new(0)); let reused_len2 = reused_len.clone(); + let reused_len3 = reused_len.clone(); let injected_len = Arc::new(AtomicUsize::new(0)); let injected_len2 = injected_len.clone(); + let uploaded_len = Arc::new(AtomicUsize::new(0)); + let uploaded_len_compressed = Arc::new(AtomicUsize::new(0)); let append_chunk_path = format!("{}_index", prefix); let upload_chunk_path = format!("{}_chunk", prefix); let is_fixed_chunk_size = prefix == "fixed"; - let (upload_queue, upload_result) = - Self::append_chunk_queue(h2.clone(), wid, append_chunk_path); + let (upload_queue, upload_result) = Self::append_chunk_queue( + h2.clone(), + wid, + append_chunk_path, + uploaded_len.clone(), + uploaded_len_compressed.clone(), + ); let start_time = std::time::Instant::now(); let index_csum = Arc::new(Mutex::new(Some(openssl::sha::Sha256::new()))); let index_csum_2 = index_csum.clone(); + let progress_handle = if archive.ends_with(".img") + || archive.ends_with(".pxar") + || archive.ends_with(".ppxar") + { + let archive = archive.to_string(); + Some(tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + let size = stream_len3.load(Ordering::SeqCst); + let size_uploaded = uploaded_len.load(Ordering::SeqCst); + let size_reused = reused_len3.load(Ordering::SeqCst); + let size_compressed = uploaded_len_compressed.load(Ordering::SeqCst); + let size_new = size - size_reused; + let elapsed = start_time.elapsed(); + let speed = size_uploaded * 1_000_000 / elapsed.as_micros() as usize; + + log::info!( + " {archive}: elapsed {:.2} s, new: {}, reused: {}, total: {}, uploaded: {} (compressed {}, average: {}/s)", + elapsed.as_secs_f64(), + HumanByte::from(size_new), + HumanByte::from(size_reused), + HumanByte::from(size), + HumanByte::from(size_uploaded), + HumanByte::from(size_compressed), + HumanByte::from(speed), + ); + } + })) + } else { + None + }; + stream .inject_reused_chunks(injections, stream_len.clone()) .and_then(move |chunk_info| match chunk_info { @@ -753,6 +814,8 @@ impl BackupWriter { ); let chunk_data = chunk_info.chunk.into_inner(); + let size = chunk_info.chunk_len as usize; + let size_encoded = chunk_data.len(); let param = json!({ "wid": wid, "digest": digest_str, @@ -776,7 +839,14 @@ impl BackupWriter { Either::Left(h2.send_request(request, upload_data).and_then( move |response| async move { upload_queue - .send((new_info, Some(response))) + .send(( + new_info, + Some(ChunkUploadResponse { + future: response, + size, + size_encoded, + }), + )) .await .map_err(|err| { format_err!("failed to send to upload queue: {}", err) @@ -806,6 +876,10 @@ impl BackupWriter { let mut guard = index_csum_2.lock().unwrap(); let csum = guard.take().unwrap().finish(); + if let Some(handle) = progress_handle { + handle.abort(); + } + futures::future::ok(UploadStats { chunk_count, chunk_reused, -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel