From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 697391FF136 for ; Mon, 09 Mar 2026 17:22:17 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id C4FB340C0; Mon, 9 Mar 2026 17:22:06 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Subject: [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Date: Mon, 9 Mar 2026 17:20:45 +0100 Message-ID: <20260309162050.1047341-8-c.ebner@proxmox.com> X-Mailer: git-send-email 2.47.3 In-Reply-To: <20260309162050.1047341-1-c.ebner@proxmox.com> References: <20260309162050.1047341-1-c.ebner@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1773073229291 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.055 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: TI4I4G4W67IWYI46E5TN77KH4CR2JIDM X-Message-ID-Hash: TI4I4G4W67IWYI46E5TN77KH4CR2JIDM X-MailFrom: c.ebner@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: Currently, a sync job sequentially pulls the backup groups and the snapshots contained within them. It is therefore limited in download speed by the single HTTP/2 connection of the source reader instance in case of remote syncs. For high latency networks, this suffer from limited download speed due to head of line blocking. Improve the throughput by allowing to pull up to a configured number of backup groups in parallel, by creating a tokio task set which concurrently pulls from the remote source. Since these are dedicated tasks, the can run independent and in parallel on the tokio runtime. Store progress output is now prefixed by the group as it depends on the group being pulled since the snapshot count differs. To update the output on a per group level, the shared group progress count is passed as atomic counter, the store progress accounted globally as well as per-group. Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182 Signed-off-by: Christian Ebner --- src/server/pull.rs | 69 +++++++++++++++++++++++++------------- src/server/sync.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 23 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 3d7d47b9c..b11e93e6c 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -30,10 +30,11 @@ use pbs_tools::sha::sha256; use super::sync::{ check_namespace_depth_limit, exclude_not_verified_or_encrypted, - ignore_not_verified_or_encrypted, LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo, - SkipReason, SyncSource, SyncSourceReader, SyncStats, + ignore_not_verified_or_encrypted, GroupWorkerSet, LocalSource, RemoteSource, + RemovedVanishedStats, SkipInfo, SkipReason, SyncSource, SyncSourceReader, SyncStats, }; use crate::backup::{check_ns_modification_privs, check_ns_privs}; +use crate::server::sync::SharedGroupProgress; use crate::tools::parallel_handler::ParallelHandler; pub(crate) struct PullTarget { @@ -616,7 +617,7 @@ async fn pull_group( params: Arc, source_namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -779,7 +780,8 @@ async fn pull_group( } } - progress.group_snapshots = list.len() as u64; + let mut local_progress = StoreProgress::new(shared_group_progress.total_groups()); + local_progress.group_snapshots = list.len() as u64; let mut sync_stats = SyncStats::default(); @@ -802,8 +804,10 @@ async fn pull_group( ) .await; - progress.done_snapshots = pos as u64 + 1; - info!("percentage done: {progress}"); + // Update done groups progress by other parallel running pulls + local_progress.done_groups = shared_group_progress.load_done(); + local_progress.done_snapshots = pos as u64 + 1; + info!("percentage done: group {group}: {local_progress}"); let stats = result?; // stop on error sync_stats.add(stats); @@ -1055,7 +1059,7 @@ async fn lock_and_pull_group( group: &BackupGroup, namespace: &BackupNamespace, target_namespace: &BackupNamespace, - progress: &mut StoreProgress, + shared_group_progress: Arc, ) -> Result { let (owner, _lock_guard) = params .target @@ -1075,7 +1079,7 @@ async fn lock_and_pull_group( return Err(()); } - pull_group(params, namespace, group, progress) + pull_group(params, namespace, group, shared_group_progress) .await .map_err(|err| { info!("sync group {group} failed - {err:#}"); @@ -1125,25 +1129,44 @@ async fn pull_ns( let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; - for (done, group) in list.into_iter().enumerate() { - progress.done_groups = done as u64; - progress.done_snapshots = 0; - progress.group_snapshots = 0; + let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len())); + let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1)); - match lock_and_pull_group( - Arc::clone(¶ms), - &group, - namespace, - &target_ns, - &mut progress, - ) - .await - { - Ok(stats) => sync_stats.add(stats), - Err(_err) => errors = true, + let mut process_results = |results| { + for result in results { + match result { + Ok(stats) => { + sync_stats.add(stats); + progress.done_groups = shared_group_progress.increment_done(); + } + Err(_err) => errors = true, + } } + }; + + for group in list.into_iter() { + let namespace = namespace.clone(); + let target_ns = target_ns.clone(); + let params = Arc::clone(¶ms); + let group_progress_cloned = Arc::clone(&shared_group_progress); + let results = group_workers + .spawn_task(async move { + lock_and_pull_group( + Arc::clone(¶ms), + &group, + &namespace, + &target_ns, + group_progress_cloned, + ) + .await + }) + .await; + process_results(results); } + let results = group_workers.join_active().await; + process_results(results); + if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { diff --git a/src/server/sync.rs b/src/server/sync.rs index 9e6aeb9b0..17d736c41 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -1,9 +1,11 @@ //! Sync datastore contents from source to target, either in push or pull direction use std::collections::HashMap; +use std::future::Future; use std::io::{Seek, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -12,9 +14,11 @@ use futures::{future::FutureExt, select}; use hyper::http::StatusCode; use pbs_config::BackupLockGuard; use serde_json::json; +use tokio::task::JoinSet; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; +use proxmox_log::LogContext; use proxmox_rest_server::WorkerTask; use proxmox_router::HttpError; @@ -792,3 +796,81 @@ pub(super) fn exclude_not_verified_or_encrypted( false } + +/// Process up to preconfigured number of group sync tasks concurrently, +/// running on different threads when possible. +pub(crate) struct GroupWorkerSet { + capacity: usize, + workers: JoinSet, +} + +impl GroupWorkerSet { + /// Create a new worker set which allows to run up to `capacity` concurrent tasks. + pub(crate) fn with_capacity(capacity: usize) -> Self { + Self { + capacity, + workers: JoinSet::new(), + } + } + + /// Spawn the given task on the workers, waiting until there is capacity to do so. + pub(crate) async fn spawn_task(&mut self, task: F) -> Vec + where + F: Future, + F: Send + 'static, + { + let mut results = Vec::with_capacity(self.workers.len()); + while self.workers.len() >= self.capacity { + // capacity reached, wait for an active task to complete + if let Some(result) = self.workers.join_next().await { + results.push(result.unwrap()); + } + } + + match LogContext::current() { + Some(context) => self.workers.spawn(context.scope(task)), + None => self.workers.spawn(task), + }; + results + } + + /// Wait on all active tasks to run to completion. + pub(crate) async fn join_active(&mut self) -> Vec { + let mut results = Vec::with_capacity(self.workers.len()); + while let Some(result) = self.workers.join_next().await { + results.push(result.unwrap()); + } + results + } +} + +/// Track group progress during parallel push/pull in sync jobs +pub(crate) struct SharedGroupProgress { + done: AtomicUsize, + total: usize, +} + +impl SharedGroupProgress { + /// Create a new instance to track group progress with expected total number of groups + pub(crate) fn with_total_groups(total: usize) -> Self { + Self { + done: AtomicUsize::new(0), + total, + } + } + + /// Return current counter value for done groups + pub(crate) fn load_done(&self) -> u64 { + self.done.load(Ordering::Acquire) as u64 + } + + /// Increment counter for done groups and return new value + pub(crate) fn increment_done(&self) -> u64 { + self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1 + } + + /// Return the number of total backup groups + pub(crate) fn total_groups(&self) -> u64 { + self.total as u64 + } +} -- 2.47.3