From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 9BDD71FF136 for ; Mon, 23 Mar 2026 13:36:26 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 17C5A1698E; Mon, 23 Mar 2026 13:36:45 +0100 (CET) Date: Mon, 23 Mar 2026 13:36:37 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= Subject: Re: [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel To: Christian Ebner , pbs-devel@lists.proxmox.com References: <20260309162050.1047341-1-c.ebner@proxmox.com> <20260309162050.1047341-8-c.ebner@proxmox.com> In-Reply-To: <20260309162050.1047341-8-c.ebner@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.17.0 (https://github.com/astroidmail/astroid) Message-Id: <1774267967.mydro9oosc.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1774269354937 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.053 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Message-ID-Hash: ZJA4E6FBPUD4XTMWS2QWBVHHKQPXXRQF X-Message-ID-Hash: ZJA4E6FBPUD4XTMWS2QWBVHHKQPXXRQF X-MailFrom: f.gruenbichler@proxmox.com X-Mailman-Rule-Misses: dmarc-mitigation; no-senders; approved; loop; banned-address; emergency; member-moderation; nonmember-moderation; administrivia; implicit-dest; max-recipients; max-size; news-moderation; no-subject; digests; suspicious-header X-Mailman-Version: 3.3.10 Precedence: list List-Id: Proxmox Backup Server development discussion List-Help: List-Owner: List-Post: List-Subscribe: List-Unsubscribe: On March 9, 2026 5:20 pm, Christian Ebner wrote: > Currently, a sync job sequentially pulls the backup groups and the > snapshots contained within them. It is therefore limited in download > speed by the single HTTP/2 connection of the source reader instance > in case of remote syncs. For high latency networks, this suffer from > limited download speed due to head of line blocking. >=20 > Improve the throughput by allowing to pull up to a configured number > of backup groups in parallel, by creating a tokio task set which > concurrently pulls from the remote source. Since these are dedicated > tasks, the can run independent and in parallel on the tokio runtime. >=20 > Store progress output is now prefixed by the group as it depends on > the group being pulled since the snapshot count differs. To update > the output on a per group level, the shared group progress count is > passed as atomic counter, the store progress accounted globally as > well as per-group. >=20 > Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=3D4182 > Signed-off-by: Christian Ebner > --- > src/server/pull.rs | 69 +++++++++++++++++++++++++------------- > src/server/sync.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 128 insertions(+), 23 deletions(-) >=20 > diff --git a/src/server/pull.rs b/src/server/pull.rs > index 3d7d47b9c..b11e93e6c 100644 > --- a/src/server/pull.rs > +++ b/src/server/pull.rs [..] > diff --git a/src/server/sync.rs b/src/server/sync.rs > index 9e6aeb9b0..17d736c41 100644 > --- a/src/server/sync.rs > +++ b/src/server/sync.rs > @@ -1,9 +1,11 @@ > //! Sync datastore contents from source to target, either in push or pul= l direction > =20 > use std::collections::HashMap; > +use std::future::Future; > use std::io::{Seek, Write}; > use std::ops::Deref; > use std::path::{Path, PathBuf}; > +use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::Duration; > =20 > @@ -12,9 +14,11 @@ use futures::{future::FutureExt, select}; > use hyper::http::StatusCode; > use pbs_config::BackupLockGuard; > use serde_json::json; > +use tokio::task::JoinSet; > use tracing::{info, warn}; > =20 > use proxmox_human_byte::HumanByte; > +use proxmox_log::LogContext; > use proxmox_rest_server::WorkerTask; > use proxmox_router::HttpError; > =20 > @@ -792,3 +796,81 @@ pub(super) fn exclude_not_verified_or_encrypted( > =20 > false > } > + > +/// Process up to preconfigured number of group sync tasks concurrently, > +/// running on different threads when possible. > +pub(crate) struct GroupWorkerSet { > + capacity: usize, > + workers: JoinSet, > +} > + > +impl GroupWorkerSet { > + /// Create a new worker set which allows to run up to `capacity` con= current tasks. > + pub(crate) fn with_capacity(capacity: usize) -> Self { > + Self { > + capacity, > + workers: JoinSet::new(), > + } > + } capacity is a bit confusing here, as it could also mean "initial" or expected number of workers, not a hard limit.. the capacity is also not an optional aspect used for optimization, but a required part of the setup.. why not call this `new` and let it take a `max_tasks` and rename the field accordingly? > + > + /// Spawn the given task on the workers, waiting until there is capa= city to do so. > + pub(crate) async fn spawn_task(&mut self, task: F) -> Vec > + where > + F: Future, > + F: Send + 'static, > + { > + let mut results =3D Vec::with_capacity(self.workers.len()); > + while self.workers.len() >=3D self.capacity { > + // capacity reached, wait for an active task to complete > + if let Some(result) =3D self.workers.join_next().await { > + results.push(result.unwrap()); should we handle a JoinError here in a meaningful fashion? > + } > + } > + > + match LogContext::current() { > + Some(context) =3D> self.workers.spawn(context.scope(task)), > + None =3D> self.workers.spawn(task), > + }; > + results > + } > + > + /// Wait on all active tasks to run to completion. > + pub(crate) async fn join_active(&mut self) -> Vec { > + let mut results =3D Vec::with_capacity(self.workers.len()); > + while let Some(result) =3D self.workers.join_next().await { > + results.push(result.unwrap()); same here > + } > + results > + } > +} IMHO this could be a stand-alone patch, with some arguments/rationale why we need it ;) we already have very similar code using regular threads in ParallelHandler.. also, none of the code above is sync specific, this is a generic "JoinSet with capacity" wrapper, and as such we could also put it in a more generic place *if* we really want to have two implementations.. > + > +/// Track group progress during parallel push/pull in sync jobs > +pub(crate) struct SharedGroupProgress { > + done: AtomicUsize, > + total: usize, > +} > + > +impl SharedGroupProgress { > + /// Create a new instance to track group progress with expected tota= l number of groups > + pub(crate) fn with_total_groups(total: usize) -> Self { > + Self { > + done: AtomicUsize::new(0), > + total, > + } > + } > + > + /// Return current counter value for done groups > + pub(crate) fn load_done(&self) -> u64 { > + self.done.load(Ordering::Acquire) as u64 > + } > + > + /// Increment counter for done groups and return new value > + pub(crate) fn increment_done(&self) -> u64 { > + self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1 > + } > + > + /// Return the number of total backup groups > + pub(crate) fn total_groups(&self) -> u64 { > + self.total as u64 > + } > +} see cover letter reply for this part.. > --=20 > 2.47.3 >=20 >=20 >=20 >=20 >=20 >=20