From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 90E6A1FF15C for ; Fri, 27 Jun 2025 21:32:40 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 514751B2FE; Fri, 27 Jun 2025 21:33:15 +0200 (CEST) To: pve-devel@lists.proxmox.com Date: Fri, 27 Jun 2025 21:32:19 +0200 MIME-Version: 1.0 Message-ID: List-Id: Proxmox VE development discussion List-Post: From: Adam Kalisz via pve-devel Precedence: list Cc: Adam Kalisz , Matt Neuforth , =?UTF-8?q?Daniel=20=C5=A0karda?= , =?UTF-8?q?V=C3=A1clav=20Sv=C3=A1tek?= X-Mailman-Version: 2.1.29 X-BeenThere: pve-devel@lists.proxmox.com List-Subscribe: , List-Unsubscribe: , List-Archive: Reply-To: Proxmox VE development discussion List-Help: Subject: [pve-devel] [PATCH PBS restore 1/1] Speedup PBS restore with concurrent fetching of chunks Content-Type: multipart/mixed; boundary="===============8173568893769907093==" Errors-To: pve-devel-bounces@lists.proxmox.com Sender: "pve-devel" --===============8173568893769907093== Content-Type: message/rfc822 Content-Disposition: inline Return-Path: X-Original-To: pve-devel@lists.proxmox.com Delivered-To: pve-devel@lists.proxmox.com Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 1E257D2470 for ; Fri, 27 Jun 2025 21:33:14 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E3B4A1B2D4 for ; Fri, 27 Jun 2025 21:32:43 +0200 (CEST) Received: from mail-ed1-x52f.google.com (mail-ed1-x52f.google.com [IPv6:2a00:1450:4864:20::52f]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Fri, 27 Jun 2025 21:32:42 +0200 (CEST) Received: by mail-ed1-x52f.google.com with SMTP id 4fb4d7f45d1cf-605b9488c28so324075a12.2 for ; Fri, 27 Jun 2025 12:32:42 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=notnullmakers.com; s=google; t=1751052755; x=1751657555; darn=lists.proxmox.com; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:from:to:cc:subject:date:message-id:reply-to; bh=eggOLweJkOrm8k6F7+g2PEioUwrwFH6lDif3NUfWiYE=; b=AK4vkZdrqbymjJdDBEPAmDpuCFPiiowXR5zpwo7M04UG2hSwpTSMXGSw23FdbWOpX5 k7LaMK7e+Z9hBszYeWCCRA+/G3wRfkHH0rofqcu3MJ5JcKb4t+RizgDCdNTVvdlPkLSL JFKBkyPs2M/8hXiK2OZXQmBAYt3BpX9bXb2dgC/AwMvWX0cC+cK1zUFgTwV4X83XVPxg YdkvHKPFee7G3f+ap270IRbi3Borlk5///9rDOgf8W59AqaUrWv5ib30EOtBEE5akpBb kPVclD4ehExr7a5XA/d2iSza/v0Y9rrLTfK5jZ7XvkrXOQy560YuIshUI2LWjEMl6kjO EJig== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1751052755; x=1751657555; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:x-gm-message-state:from:to:cc:subject:date:message-id :reply-to; bh=eggOLweJkOrm8k6F7+g2PEioUwrwFH6lDif3NUfWiYE=; b=EjI0qM4R69XNdXfXoJBy0vQRKvWyggAmjtyWshvH+zdQwQLZwgfeL+i2qZyeYMeOK/ 29ChZGLrP0sBDGPhS/DeVN72TumTopOhlocVpkjgDCeWEwjO7qx+9lDjkcUe5VGldHSI psi6tkDYKD7VAC7zNsKTYl1SvL0oJjvZEyEcpwUIpxOY0QFN5Br3fVkqC6J2+qX96LJt 2YcLjGgNBrCBRSf+xK138jGpEX9HQhRSUAamLnplXNWbpC6unDzpuXZD/qAmh8ZH97tG 7U5OypDTCgU6ohn40px1VFMbz7Ht6LZhhRLtFG2x2E0YyT1bgOVgUkYBItPiwv2T7srV f/8A== X-Gm-Message-State: AOJu0YweY2hnZKDoK+n91u7V0nhBVjiQ/hq8Wrp6vZsgR6C7tFCBAd9g GLZFI1EmI/Dv6QxRzyyOGRdcecqcBKmejIFBLFuJ/BqvwmEHfK/2u8dIWe6alz4h+yHpofozwx0 IawC0G/Q= X-Gm-Gg: ASbGncvb2eqdgPd037zmG5C6YX59jSuK2U3eDAZaurlfApimLWqXKTxhe/3PhWeau/v L6y9I6aNpIoZLFru6kcGUYMqbpefQNAaji5xAXBEbf+C5O2K4ureOwCGEP7Gsw7ARYvmwB/Byx8 CPJ9llKgmhMdk9U9PZXpe/4vUPkbOL+UHY+WgeG6aPTFMV5hOVZdOnq7pRjL/0nJpYTN8RuMFgw IlTXNPnpVrQ7tQJTh0mz3//4eGvSLnWtFsrQFsLYJQP6BbVj4S4Ms2xiHSlzq+NsXrINXO/uZF3 o48kNOUIDqnocMHXll5602rIXwx7zdqmvCTjXbEgOLJMyeuNiWDc7FruTHA9aAMOoS3iy3vU X-Google-Smtp-Source: AGHT+IFoxJx/5Wkze7zehD+7fqHob9ujphiTvL/d73lRjrLB8oMFbv7JpXvvQD77OodQ5eNBnPesHw== X-Received: by 2002:a17:907:9449:b0:ae3:5e70:330d with SMTP id a640c23a62f3a-ae35e703780mr302368066b.12.1751052755024; Fri, 27 Jun 2025 12:32:35 -0700 (PDT) Received: from localhost.localdomain ([2a02:8308:299:4600::5753]) by smtp.gmail.com with ESMTPSA id a640c23a62f3a-ae353c6bd2dsm180556266b.113.2025.06.27.12.32.34 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 27 Jun 2025 12:32:34 -0700 (PDT) From: Adam Kalisz To: pve-devel@lists.proxmox.com Cc: Adam Kalisz , =?UTF-8?q?Daniel=20=C5=A0karda?= , Matt Neuforth , =?UTF-8?q?V=C3=A1clav=20Sv=C3=A1tek?= Subject: [PATCH PBS restore 1/1] Speedup PBS restore with concurrent fetching of chunks Date: Fri, 27 Jun 2025 21:32:19 +0200 Message-ID: <20250627193219.584773-1-adam.kalisz@notnullmakers.com> X-Mailer: git-send-email 2.47.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.000 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DKIM_SIGNED 0.1 Message has a DKIM or DK signature, not necessarily valid DKIM_VALID -0.1 Message has at least one valid DKIM or DK signature DKIM_VALID_AU -0.1 Message has a valid DKIM or DK signature from author's domain DKIM_VALID_EF -0.1 Message has a valid DKIM or DK signature from envelope-from domain DMARC_PASS -0.1 DMARC pass policy RCVD_IN_DNSWL_NONE -0.0001 Sender listed at https://www.dnswl.org/, no trust SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [notnullmakers.com,restore.rs] Speedup PBS restore by fetching chunks concurrently using a configurable number of threads. 4 threads are used by default, the environment variable PBS_RESTORE_CONCURRENCY should be between 1 and 32 inclusive. The upper limit is max_blocking_threads in BackupClient. Signed-off-by: Adam Kalisz Co-authored-by: Daniel Škarda Tested-by: Matt Neuforth Financed-by: Václav Svátek --- src/restore.rs | 101 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 17 deletions(-) diff --git a/src/restore.rs b/src/restore.rs index 5a5a398..c75bf0d 100644 --- a/src/restore.rs +++ b/src/restore.rs @@ -1,7 +1,11 @@ use std::convert::TryInto; -use std::sync::{Arc, Mutex}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, +}; use anyhow::{bail, format_err, Error}; +use futures::stream::StreamExt; use once_cell::sync::OnceCell; use tokio::runtime::Runtime; @@ -69,7 +73,7 @@ impl RestoreTask { let runtime = get_runtime_with_builder(|| { let mut builder = tokio::runtime::Builder::new_multi_thread(); builder.enable_all(); - builder.max_blocking_threads(2); + builder.max_blocking_threads(32); builder.worker_threads(4); builder.thread_name("proxmox-restore-worker"); builder @@ -149,9 +153,7 @@ impl RestoreTask { )?; let most_used = index.find_most_used_chunks(8); - let file_info = manifest.lookup_file_info(&archive_name)?; - let chunk_reader = RemoteChunkReader::new( Arc::clone(&client), self.crypt_config.clone(), @@ -162,13 +164,43 @@ impl RestoreTask { let mut per = 0; let mut bytes = 0; let mut zeroes = 0; - + let mut storage_nonzero_write_time = std::time::Duration::new(0, 0); + let mut storage_nonzero_writes: u64 = 0; + let mut chunk_fetch_time = std::time::Duration::new(0, 0); + let chunks_fetched = Arc::new(AtomicU64::new(0)); let start_time = std::time::Instant::now(); + // Should be lower than max_blocking_threads in BackupSetup + let mut concurrent_requests: usize = 4; + + if let Ok(val_str) = std::env::var("PBS_RESTORE_CONCURRENCY") { + match val_str.parse::() { + Ok(num_threads) if num_threads > 0 => { + if verbose { + eprintln!( + "Using custom concurrency level from environment ({} threads)", + num_threads + ); + } + concurrent_requests = num_threads; + } + _ => { + if verbose { + eprintln!( + "Using default concurrency level ({} threads)", + concurrent_requests + ); + } + } + } + } + + let mut chunk_futures = Vec::new(); for pos in 0..index.index_count() { - let digest = index.index_digest(pos).unwrap(); + let digest = index.index_digest(pos).unwrap().clone(); let offset = (pos * index.chunk_size) as u64; - if digest == &zero_chunk_digest { + + if digest == zero_chunk_digest { let res = write_zero_callback(offset, index.chunk_size as u64); if res < 0 { bail!("write_zero_callback failed ({})", res); @@ -176,22 +208,54 @@ impl RestoreTask { bytes += index.chunk_size; zeroes += index.chunk_size; } else { - let raw_data = ReadChunk::read_chunk(&chunk_reader, digest)?; - let res = write_data_callback(offset, &raw_data); - if res < 0 { - bail!("write_data_callback failed ({})", res); - } - bytes += raw_data.len(); + let chunk_reader = chunk_reader.clone(); + let chunks_fetched_clone = Arc::clone(&chunks_fetched); + + let future = async move { + tokio::task::spawn_blocking(move || { + let start_chunk_fetch_time = std::time::Instant::now(); + let raw_data = ReadChunk::read_chunk(&chunk_reader, &digest)?; + let fetch_elapsed = start_chunk_fetch_time.elapsed(); + chunks_fetched_clone.fetch_add(1, Ordering::Relaxed); + Ok::<_, Error>((offset, raw_data, fetch_elapsed)) + }) + .await + .unwrap() + }; + chunk_futures.push(future); + } + } + + let mut stream = futures::stream::iter(chunk_futures).buffer_unordered(concurrent_requests); + + while let Some(result) = stream.next().await { + let (offset, raw_data, fetch_elapsed_time) = result?; + let start_storage_write_time = std::time::Instant::now(); + let res = write_data_callback(offset, &raw_data); + let storage_write_elapsed = start_storage_write_time.elapsed(); + if res < 0 { + bail!("write_data_callback failed ({})", res); } + storage_nonzero_write_time = storage_nonzero_write_time + .checked_add(storage_write_elapsed) + .unwrap_or_default(); + storage_nonzero_writes += 1; + chunk_fetch_time += fetch_elapsed_time; + let chunk_len = raw_data.len(); + bytes += chunk_len; + if verbose { - let next_per = ((pos + 1) * 100) / index.index_count(); + let next_per = (bytes * 100) / (index.index_count() * index.chunk_size); if per != next_per { eprintln!( - "progress {}% (read {} bytes, zeroes = {}% ({} bytes), duration {} sec)", + "progress {}% (read {} bytes, zeroes = {}% ({} bytes), \ + nonzero writes = {}, chunks fetched = {}, duration {} sec)", next_per, bytes, zeroes * 100 / bytes, zeroes, + storage_nonzero_writes, + chunks_fetched.load(Ordering::Relaxed), start_time.elapsed().as_secs() ); per = next_per; @@ -202,12 +266,15 @@ impl RestoreTask { let end_time = std::time::Instant::now(); let elapsed = end_time.duration_since(start_time); eprintln!( - "restore image complete (bytes={}, duration={:.2}s, speed={:.2}MB/s)", + "restore image complete (bytes={}, avg fetch time={:.4}ms, avg time per nonzero write={:.4}ms, \ + storage nonzero total write time={:.3}s, duration={:.2}s, speed={:.2}MB/s)", bytes, + chunk_fetch_time.as_nanos() as f64 / (chunks_fetched.load(Ordering::Relaxed) as f64 * 1_000_000.0), + storage_nonzero_write_time.as_nanos() as f64 / (storage_nonzero_writes as f64 * 1_000_000.0), + storage_nonzero_write_time.as_secs_f64(), elapsed.as_secs_f64(), bytes as f64 / (1024.0 * 1024.0 * elapsed.as_secs_f64()) ); - Ok(()) } -- 2.47.2 --===============8173568893769907093== Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline _______________________________________________ pve-devel mailing list pve-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel --===============8173568893769907093==--