* [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs
@ 2026-03-09 16:20 Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
` (11 more replies)
0 siblings, 12 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Syncing contents from/to a remote source via a sync job suffers from
low throughput on high latency networks because of limitations by the
HTTP/2 connection, as described in [0]. To improve, syncing multiple
groups in parallel by establishing multiple reader instances has been
suggested.
This patch series implements the functionality by adding the sync job
configuration property `worker-threads`, allowing to define the
number of groups pull/push tokio tasks to be executed in parallel on
the runtime during each job.
Examplary configuration:
```
sync: s-8764c440-3a6c
...
store datastore
sync-direction push
worker-threads 4
```
Since log messages are now also written concurrently, prefix logs
related to groups, snapshots and archives with their respective
context prefix and add context to error messages.
Further, improve logging especially for sync jobs in push direction,
which only displayed limited information so far.
[0] https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Change since version 4 (thanks @Max for review):
- Use dedicated tokio tasks to run in parallel on different runtime threads,
not just multiple concurrent futures on the same thread.
- Rework store progress accounting logic to avoid mutex locks when possible,
use atomic counters instead.
- Expose setting also in the sync job edit window, not just the config.
proxmox:
Christian Ebner (1):
pbs api types: add `worker-threads` to sync job config
pbs-api-types/src/jobs.rs | 11 +++++++++++
1 file changed, 11 insertions(+)
proxmox-backup:
Christian Ebner (11):
client: backup writer: fix upload stats size and rate for push sync
api: config/sync: add optional `worker-threads` property
sync: pull: revert avoiding reinstantiation for encountered chunks map
sync: pull: factor out backup group locking and owner check
sync: pull: prepare pull parameters to be shared across parallel tasks
fix #4182: server: sync: allow pulling backup groups in parallel
server: pull: prefix log messages and add error context
sync: push: prepare push parameters to be shared across parallel tasks
server: sync: allow pushing groups concurrently
server: push: prefix log messages and add additional logging
ui: expose group worker setting in sync job edit window
pbs-client/src/backup_stats.rs | 20 +--
pbs-client/src/backup_writer.rs | 4 +-
src/api2/config/sync.rs | 10 ++
src/api2/pull.rs | 9 +-
src/api2/push.rs | 8 +-
src/server/pull.rs | 246 +++++++++++++++++++-------------
src/server/push.rs | 178 +++++++++++++++++------
src/server/sync.rs | 90 +++++++++++-
www/window/SyncJobEdit.js | 11 ++
9 files changed, 411 insertions(+), 165 deletions(-)
Summary over all repositories:
10 files changed, 422 insertions(+), 165 deletions(-)
--
Generated by murpp 0.9.0
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
` (10 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Allow to specify the number of concurrent worker threads used to sync
groups for sync jobs. Values can range from the current 1 to 32,
although higher number of threads will saturate with respect to
performance improvements.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-api-types/src/jobs.rs | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 7e6dfb94..c4e6dda6 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -88,6 +88,11 @@ pub const VERIFY_JOB_VERIFY_THREADS_SCHEMA: Schema = threads_schema(
4,
);
+pub const SYNC_WORKER_THREADS_SCHEMA: Schema = threads_schema(
+ "The number of worker threads to process groups in parallel.",
+ 1,
+);
+
#[api(
properties: {
"next-run": {
@@ -664,6 +669,10 @@ pub const UNMOUNT_ON_SYNC_DONE_SCHEMA: Schema =
type: SyncDirection,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
}
)]
#[derive(Serialize, Deserialize, Clone, Updater, PartialEq)]
@@ -709,6 +718,8 @@ pub struct SyncJobConfig {
pub unmount_on_done: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_direction: Option<SyncDirection>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub worker_threads: Option<usize>,
}
impl SyncJobConfig {
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
` (9 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Currently, the logical size of the uploaded chunks is used for size
and upload rate calculation in case of sync jobs in push direction,
leading to inflated values for the transferred size and rate.
Use the compressed chunk size instead. To get the required
information, return the more verbose `UploadStats` on
`upload_index_chunk_info` calls and use it's compressed size for the
transferred `bytes` of `SyncStats` instead. Since `UploadStats` is
now part of a pub api, increase it's scope as well.
This is then finally being used to display the upload size and
calculate the rate for the push sync job.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-client/src/backup_stats.rs | 20 ++++++++++----------
pbs-client/src/backup_writer.rs | 4 ++--
src/server/push.rs | 4 ++--
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/pbs-client/src/backup_stats.rs b/pbs-client/src/backup_stats.rs
index f0563a001..edf7ef3c4 100644
--- a/pbs-client/src/backup_stats.rs
+++ b/pbs-client/src/backup_stats.rs
@@ -15,16 +15,16 @@ pub struct BackupStats {
}
/// Extended backup run statistics and archive checksum
-pub(crate) struct UploadStats {
- pub(crate) chunk_count: usize,
- pub(crate) chunk_reused: usize,
- pub(crate) chunk_injected: usize,
- pub(crate) size: usize,
- pub(crate) size_reused: usize,
- pub(crate) size_injected: usize,
- pub(crate) size_compressed: usize,
- pub(crate) duration: Duration,
- pub(crate) csum: [u8; 32],
+pub struct UploadStats {
+ pub chunk_count: usize,
+ pub chunk_reused: usize,
+ pub chunk_injected: usize,
+ pub size: usize,
+ pub size_reused: usize,
+ pub size_injected: usize,
+ pub size_compressed: usize,
+ pub duration: Duration,
+ pub csum: [u8; 32],
}
impl UploadStats {
diff --git a/pbs-client/src/backup_writer.rs b/pbs-client/src/backup_writer.rs
index 49aff3fdd..4a4391c8b 100644
--- a/pbs-client/src/backup_writer.rs
+++ b/pbs-client/src/backup_writer.rs
@@ -309,7 +309,7 @@ impl BackupWriter {
archive_name: &BackupArchiveName,
stream: impl Stream<Item = Result<MergedChunkInfo, Error>>,
options: UploadOptions,
- ) -> Result<BackupStats, Error> {
+ ) -> Result<UploadStats, Error> {
let mut param = json!({ "archive-name": archive_name });
let (prefix, archive_size) = options.index_type.to_prefix_and_size();
if let Some(size) = archive_size {
@@ -391,7 +391,7 @@ impl BackupWriter {
.post(&format!("{prefix}_close"), Some(param))
.await?;
- Ok(upload_stats.to_backup_stats())
+ Ok(upload_stats)
}
pub async fn upload_stream(
diff --git a/src/server/push.rs b/src/server/push.rs
index 7f93cc034..b7eeeffae 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -1058,8 +1058,8 @@ async fn push_index(
.await?;
Ok(SyncStats {
- chunk_count: upload_stats.chunk_count as usize,
- bytes: upload_stats.size as usize,
+ chunk_count: upload_stats.chunk_count,
+ bytes: upload_stats.size_compressed,
elapsed: upload_stats.duration,
removed: None,
})
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
` (8 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Allow to configure from 1 up to 32 worker threads to perform
multiple group syncs in parallel.
The property is exposed via the sync job config and passed to
the pull/push parameters for the sync job to setup and execute the
thread pool accordingly.
Implements the schema definitions and includes the new property to
the `SyncJobConfig`, `PullParameters` and `PushParameters`.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/api2/config/sync.rs | 10 ++++++++++
src/api2/pull.rs | 9 ++++++++-
src/api2/push.rs | 8 +++++++-
src/server/pull.rs | 4 ++++
src/server/push.rs | 4 ++++
src/server/sync.rs | 1 +
6 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index dff447cb6..e432c3db4 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -345,6 +345,8 @@ pub enum DeletableProperty {
UnmountOnDone,
/// Delete the sync_direction property,
SyncDirection,
+ /// Delete the worker_threads property,
+ WorkerThreads,
}
#[api(
@@ -471,6 +473,9 @@ pub fn update_sync_job(
DeletableProperty::SyncDirection => {
data.sync_direction = None;
}
+ DeletableProperty::WorkerThreads => {
+ data.worker_threads = None;
+ }
}
}
}
@@ -530,6 +535,10 @@ pub fn update_sync_job(
data.sync_direction = Some(sync_direction);
}
+ if let Some(worker_threads) = update.worker_threads {
+ data.worker_threads = Some(worker_threads);
+ }
+
if update.limit.rate_in.is_some() {
data.limit.rate_in = update.limit.rate_in;
}
@@ -705,6 +714,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
run_on_mount: None,
unmount_on_done: None,
sync_direction: None, // use default
+ worker_threads: None,
};
// should work without ACLs
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 4b1fd5e60..7cf165f91 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -11,7 +11,7 @@ use pbs_api_types::{
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
RESYNC_CORRUPT_SCHEMA, SYNC_ENCRYPTED_ONLY_SCHEMA, SYNC_VERIFIED_ONLY_SCHEMA,
- TRANSFER_LAST_SCHEMA,
+ SYNC_WORKER_THREADS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
use proxmox_rest_server::WorkerTask;
@@ -91,6 +91,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
sync_job.encrypted_only,
sync_job.verified_only,
sync_job.resync_corrupt,
+ sync_job.worker_threads,
)
}
}
@@ -148,6 +149,10 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
schema: RESYNC_CORRUPT_SCHEMA,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -175,6 +180,7 @@ async fn pull(
encrypted_only: Option<bool>,
verified_only: Option<bool>,
resync_corrupt: Option<bool>,
+ worker_threads: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -215,6 +221,7 @@ async fn pull(
encrypted_only,
verified_only,
resync_corrupt,
+ worker_threads,
)?;
// fixme: set to_stdout to false?
diff --git a/src/api2/push.rs b/src/api2/push.rs
index e5edc13e0..f27f4ea1a 100644
--- a/src/api2/push.rs
+++ b/src/api2/push.rs
@@ -6,7 +6,7 @@ use pbs_api_types::{
GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
PRIV_DATASTORE_READ, PRIV_REMOTE_DATASTORE_BACKUP, PRIV_REMOTE_DATASTORE_PRUNE,
REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, SYNC_ENCRYPTED_ONLY_SCHEMA,
- SYNC_VERIFIED_ONLY_SCHEMA, TRANSFER_LAST_SCHEMA,
+ SYNC_VERIFIED_ONLY_SCHEMA, SYNC_WORKER_THREADS_SCHEMA, TRANSFER_LAST_SCHEMA,
};
use proxmox_rest_server::WorkerTask;
use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -108,6 +108,10 @@ fn check_push_privs(
schema: TRANSFER_LAST_SCHEMA,
optional: true,
},
+ "worker-threads": {
+ schema: SYNC_WORKER_THREADS_SCHEMA,
+ optional: true,
+ },
},
},
access: {
@@ -133,6 +137,7 @@ async fn push(
verified_only: Option<bool>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ worker_threads: Option<usize>,
rpcenv: &mut dyn RpcEnvironment,
) -> Result<String, Error> {
let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
@@ -164,6 +169,7 @@ async fn push(
verified_only,
limit,
transfer_last,
+ worker_threads,
)
.await?;
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 57c5ef323..ddb59db54 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -65,6 +65,8 @@ pub(crate) struct PullParameters {
verified_only: bool,
/// Whether to re-sync corrupted snapshots
resync_corrupt: bool,
+ /// Maximum number of worker threads to pull during sync job
+ worker_threads: Option<usize>,
}
impl PullParameters {
@@ -85,6 +87,7 @@ impl PullParameters {
encrypted_only: Option<bool>,
verified_only: Option<bool>,
resync_corrupt: Option<bool>,
+ worker_threads: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -134,6 +137,7 @@ impl PullParameters {
encrypted_only,
verified_only,
resync_corrupt,
+ worker_threads,
})
}
}
diff --git a/src/server/push.rs b/src/server/push.rs
index b7eeeffae..a0484ef62 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -83,6 +83,8 @@ pub(crate) struct PushParameters {
verified_only: bool,
/// How many snapshots should be transferred at most (taking the newest N snapshots)
transfer_last: Option<usize>,
+ /// Maximum number of worker threads for push during sync job
+ worker_threads: Option<usize>,
}
impl PushParameters {
@@ -102,6 +104,7 @@ impl PushParameters {
verified_only: Option<bool>,
limit: RateLimitConfig,
transfer_last: Option<usize>,
+ worker_threads: Option<usize>,
) -> Result<Self, Error> {
if let Some(max_depth) = max_depth {
ns.check_max_depth(max_depth)?;
@@ -164,6 +167,7 @@ impl PushParameters {
encrypted_only,
verified_only,
transfer_last,
+ worker_threads,
})
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index aedf4a271..9e6aeb9b0 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -675,6 +675,7 @@ pub fn do_sync_job(
sync_job.verified_only,
sync_job.limit.clone(),
sync_job.transfer_last,
+ sync_job.worker_threads,
)
.await?;
push_store(push_params).await?
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (2 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
` (7 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
While keeping a store wide instance to avoid reinstantiation on each
group is desired when iteratively processing groups, this cannot work
when performing the sync of multiple groups in parallel.
This is in preparation for parallel group syncs and reverts commit
ecdec5bc ("sync: pull: avoid reinstantiation for encountered chunks
map").
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 26 +++++---------------------
1 file changed, 5 insertions(+), 21 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index ddb59db54..254b36759 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -617,7 +617,6 @@ async fn pull_group(
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
- encountered_chunks: Arc<Mutex<EncounteredChunks>>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -718,6 +717,9 @@ async fn pull_group(
transfer_last_skip_info.reset();
}
+ // start with 65536 chunks (up to 256 GiB)
+ let encountered_chunks = Arc::new(Mutex::new(EncounteredChunks::with_capacity(1024 * 64)));
+
let backup_group = params
.target
.store
@@ -981,9 +983,6 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
let mut synced_ns = HashSet::with_capacity(namespaces.len());
let mut sync_stats = SyncStats::default();
- // start with 65536 chunks (up to 256 GiB)
- let encountered_chunks = Arc::new(Mutex::new(EncounteredChunks::with_capacity(1024 * 64)));
-
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1005,7 +1004,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
}
}
- match pull_ns(&namespace, &mut params, encountered_chunks.clone()).await {
+ match pull_ns(&namespace, &mut params).await {
Ok((ns_progress, ns_sync_stats, ns_errors)) => {
errors |= ns_errors;
@@ -1063,7 +1062,6 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
async fn pull_ns(
namespace: &BackupNamespace,
params: &mut PullParameters,
- encountered_chunks: Arc<Mutex<EncounteredChunks>>,
) -> Result<(StoreProgress, SyncStats, bool), Error> {
let list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
@@ -1122,16 +1120,7 @@ async fn pull_ns(
);
errors = true; // do not stop here, instead continue
} else {
- encountered_chunks.lock().unwrap().clear();
- match pull_group(
- params,
- namespace,
- &group,
- &mut progress,
- encountered_chunks.clone(),
- )
- .await
- {
+ match pull_group(params, namespace, &group, &mut progress).await {
Ok(stats) => sync_stats.add(stats),
Err(err) => {
info!("sync group {} failed - {err:#}", &group);
@@ -1252,9 +1241,4 @@ impl EncounteredChunks {
}
}
}
-
- /// Clear all entries
- fn clear(&mut self) {
- self.chunk_set.clear();
- }
}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (3 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
` (6 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Creates a dedicated entry point for parallel group pulling and
simplifies the backup group loop logic.
While locking and owner check could have been moved to pull_group()
as well, that function is already hard to parse as is. Logging of
errors is moved to the helper to facilitate it for parallel pulling.
This further changes the multi line error on locking error. The
provided information was redundant anyways and multiline logging with
parallel group sync must be avoided anyways as ordering cannot
be guaranteed anymore.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 69 +++++++++++++++++++++++++---------------------
1 file changed, 37 insertions(+), 32 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 254b36759..c074c2b78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1047,6 +1047,40 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
Ok(sync_stats)
}
+/// Get and exclusive lock on the backup group, check ownership matches
+/// sync job owner and pull group contents.
+async fn lock_and_pull_group(
+ params: &PullParameters,
+ group: &BackupGroup,
+ namespace: &BackupNamespace,
+ target_namespace: &BackupNamespace,
+ progress: &mut StoreProgress,
+) -> Result<SyncStats, ()> {
+ let (owner, _lock_guard) = params
+ .target
+ .store
+ .create_locked_backup_group(target_namespace, group, ¶ms.owner)
+ .map_err(|err| {
+ info!("sync group {group} failed - group lock failed: {err}");
+ info!("create_locked_backup_group failed");
+ })?;
+
+ if params.owner != owner {
+ // only the owner is allowed to create additional snapshots
+ info!(
+ "sync group {group} failed - owner check failed ({} != {owner})",
+ params.owner
+ );
+ return Err(());
+ }
+
+ pull_group(params, namespace, group, progress)
+ .await
+ .map_err(|err| {
+ info!("sync group {group} failed - {err:#}");
+ })
+}
+
/// Pulls a namespace according to `params`.
///
/// Pulling a namespace consists of the following steps:
@@ -1095,38 +1129,9 @@ async fn pull_ns(
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- let (owner, _lock_guard) =
- match params
- .target
- .store
- .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
- {
- Ok(result) => result,
- Err(err) => {
- info!("sync group {} failed - group lock failed: {err}", &group);
- errors = true;
- // do not stop here, instead continue
- info!("create_locked_backup_group failed");
- continue;
- }
- };
-
- // permission check
- if params.owner != owner {
- // only the owner is allowed to create additional snapshots
- info!(
- "sync group {} failed - owner check failed ({} != {owner})",
- &group, params.owner
- );
- errors = true; // do not stop here, instead continue
- } else {
- match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => sync_stats.add(stats),
- Err(err) => {
- info!("sync group {} failed - {err:#}", &group);
- errors = true; // do not stop here, instead continue
- }
- }
+ match lock_and_pull_group(params, &group, &namespace, &target_ns, &mut progress).await {
+ Ok(stats) => sync_stats.add(stats),
+ Err(_err) => errors = true,
}
}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (4 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
` (5 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
When performing parallel group syncs, the pull parameters must be
shared between all tasks which is not possible with regular
references due to lifetime and ownership issues. Pack them into an
atomic reference counter instead so they can easily be cloned when
required.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 25 +++++++++++++++++--------
1 file changed, 17 insertions(+), 8 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index c074c2b78..3d7d47b9c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -377,7 +377,7 @@ async fn pull_single_archive<'a>(
/// -- if not, pull it from the remote
/// - Download log if not already existing
async fn pull_snapshot<'a>(
- params: &PullParameters,
+ params: Arc<PullParameters>,
reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
@@ -555,7 +555,7 @@ async fn pull_snapshot<'a>(
/// The `reader` is configured to read from the source backup directory, while the
/// `snapshot` is pointing to the local datastore and target namespace.
async fn pull_snapshot_from<'a>(
- params: &PullParameters,
+ params: Arc<PullParameters>,
reader: Arc<dyn SyncSourceReader + 'a>,
snapshot: &'a pbs_datastore::BackupDir,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
@@ -613,7 +613,7 @@ async fn pull_snapshot_from<'a>(
/// - remote snapshot access is checked by remote (twice: query and opening the backup reader)
/// - local group owner is already checked by pull_store
async fn pull_group(
- params: &PullParameters,
+ params: Arc<PullParameters>,
source_namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
@@ -794,7 +794,7 @@ async fn pull_group(
.reader(source_namespace, &from_snapshot)
.await?;
let result = pull_snapshot_from(
- params,
+ Arc::clone(¶ms),
reader,
&to_snapshot,
encountered_chunks.clone(),
@@ -982,6 +982,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
let (mut groups, mut snapshots) = (0, 0);
let mut synced_ns = HashSet::with_capacity(namespaces.len());
let mut sync_stats = SyncStats::default();
+ let params = Arc::new(params);
for namespace in namespaces {
let source_store_ns_str = print_store_and_ns(params.source.get_store(), &namespace);
@@ -1004,7 +1005,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
}
}
- match pull_ns(&namespace, &mut params).await {
+ match pull_ns(&namespace, Arc::clone(¶ms)).await {
Ok((ns_progress, ns_sync_stats, ns_errors)) => {
errors |= ns_errors;
@@ -1050,7 +1051,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
/// Get and exclusive lock on the backup group, check ownership matches
/// sync job owner and pull group contents.
async fn lock_and_pull_group(
- params: &PullParameters,
+ params: Arc<PullParameters>,
group: &BackupGroup,
namespace: &BackupNamespace,
target_namespace: &BackupNamespace,
@@ -1095,7 +1096,7 @@ async fn lock_and_pull_group(
/// - owner check for vanished groups done here
async fn pull_ns(
namespace: &BackupNamespace,
- params: &mut PullParameters,
+ params: Arc<PullParameters>,
) -> Result<(StoreProgress, SyncStats, bool), Error> {
let list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
@@ -1129,7 +1130,15 @@ async fn pull_ns(
progress.done_snapshots = 0;
progress.group_snapshots = 0;
- match lock_and_pull_group(params, &group, &namespace, &target_ns, &mut progress).await {
+ match lock_and_pull_group(
+ Arc::clone(¶ms),
+ &group,
+ namespace,
+ &target_ns,
+ &mut progress,
+ )
+ .await
+ {
Ok(stats) => sync_stats.add(stats),
Err(_err) => errors = true,
}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (5 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
` (4 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them. It is therefore limited in download
speed by the single HTTP/2 connection of the source reader instance
in case of remote syncs. For high latency networks, this suffer from
limited download speed due to head of line blocking.
Improve the throughput by allowing to pull up to a configured number
of backup groups in parallel, by creating a tokio task set which
concurrently pulls from the remote source. Since these are dedicated
tasks, the can run independent and in parallel on the tokio runtime.
Store progress output is now prefixed by the group as it depends on
the group being pulled since the snapshot count differs. To update
the output on a per group level, the shared group progress count is
passed as atomic counter, the store progress accounted globally as
well as per-group.
Fixes: https://bugzilla.proxmox.com/show_bug.cgi?id=4182
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 69 +++++++++++++++++++++++++-------------
src/server/sync.rs | 82 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 128 insertions(+), 23 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 3d7d47b9c..b11e93e6c 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -30,10 +30,11 @@ use pbs_tools::sha::sha256;
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
- ignore_not_verified_or_encrypted, LocalSource, RemoteSource, RemovedVanishedStats, SkipInfo,
- SkipReason, SyncSource, SyncSourceReader, SyncStats,
+ ignore_not_verified_or_encrypted, GroupWorkerSet, LocalSource, RemoteSource,
+ RemovedVanishedStats, SkipInfo, SkipReason, SyncSource, SyncSourceReader, SyncStats,
};
use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::server::sync::SharedGroupProgress;
use crate::tools::parallel_handler::ParallelHandler;
pub(crate) struct PullTarget {
@@ -616,7 +617,7 @@ async fn pull_group(
params: Arc<PullParameters>,
source_namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -779,7 +780,8 @@ async fn pull_group(
}
}
- progress.group_snapshots = list.len() as u64;
+ let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+ local_progress.group_snapshots = list.len() as u64;
let mut sync_stats = SyncStats::default();
@@ -802,8 +804,10 @@ async fn pull_group(
)
.await;
- progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ // Update done groups progress by other parallel running pulls
+ local_progress.done_groups = shared_group_progress.load_done();
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: group {group}: {local_progress}");
let stats = result?; // stop on error
sync_stats.add(stats);
@@ -1055,7 +1059,7 @@ async fn lock_and_pull_group(
group: &BackupGroup,
namespace: &BackupNamespace,
target_namespace: &BackupNamespace,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, ()> {
let (owner, _lock_guard) = params
.target
@@ -1075,7 +1079,7 @@ async fn lock_and_pull_group(
return Err(());
}
- pull_group(params, namespace, group, progress)
+ pull_group(params, namespace, group, shared_group_progress)
.await
.map_err(|err| {
info!("sync group {group} failed - {err:#}");
@@ -1125,25 +1129,44 @@ async fn pull_ns(
let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
+ let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1));
- match lock_and_pull_group(
- Arc::clone(¶ms),
- &group,
- namespace,
- &target_ns,
- &mut progress,
- )
- .await
- {
- Ok(stats) => sync_stats.add(stats),
- Err(_err) => errors = true,
+ let mut process_results = |results| {
+ for result in results {
+ match result {
+ Ok(stats) => {
+ sync_stats.add(stats);
+ progress.done_groups = shared_group_progress.increment_done();
+ }
+ Err(_err) => errors = true,
+ }
}
+ };
+
+ for group in list.into_iter() {
+ let namespace = namespace.clone();
+ let target_ns = target_ns.clone();
+ let params = Arc::clone(¶ms);
+ let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let results = group_workers
+ .spawn_task(async move {
+ lock_and_pull_group(
+ Arc::clone(¶ms),
+ &group,
+ &namespace,
+ &target_ns,
+ group_progress_cloned,
+ )
+ .await
+ })
+ .await;
+ process_results(results);
}
+ let results = group_workers.join_active().await;
+ process_results(results);
+
if params.remove_vanished {
let result: Result<(), Error> = proxmox_lang::try_block!({
for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 9e6aeb9b0..17d736c41 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -1,9 +1,11 @@
//! Sync datastore contents from source to target, either in push or pull direction
use std::collections::HashMap;
+use std::future::Future;
use std::io::{Seek, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -12,9 +14,11 @@ use futures::{future::FutureExt, select};
use hyper::http::StatusCode;
use pbs_config::BackupLockGuard;
use serde_json::json;
+use tokio::task::JoinSet;
use tracing::{info, warn};
use proxmox_human_byte::HumanByte;
+use proxmox_log::LogContext;
use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
@@ -792,3 +796,81 @@ pub(super) fn exclude_not_verified_or_encrypted(
false
}
+
+/// Process up to preconfigured number of group sync tasks concurrently,
+/// running on different threads when possible.
+pub(crate) struct GroupWorkerSet<T> {
+ capacity: usize,
+ workers: JoinSet<T>,
+}
+
+impl<T: Send + 'static> GroupWorkerSet<T> {
+ /// Create a new worker set which allows to run up to `capacity` concurrent tasks.
+ pub(crate) fn with_capacity(capacity: usize) -> Self {
+ Self {
+ capacity,
+ workers: JoinSet::new(),
+ }
+ }
+
+ /// Spawn the given task on the workers, waiting until there is capacity to do so.
+ pub(crate) async fn spawn_task<F>(&mut self, task: F) -> Vec<T>
+ where
+ F: Future<Output = T>,
+ F: Send + 'static,
+ {
+ let mut results = Vec::with_capacity(self.workers.len());
+ while self.workers.len() >= self.capacity {
+ // capacity reached, wait for an active task to complete
+ if let Some(result) = self.workers.join_next().await {
+ results.push(result.unwrap());
+ }
+ }
+
+ match LogContext::current() {
+ Some(context) => self.workers.spawn(context.scope(task)),
+ None => self.workers.spawn(task),
+ };
+ results
+ }
+
+ /// Wait on all active tasks to run to completion.
+ pub(crate) async fn join_active(&mut self) -> Vec<T> {
+ let mut results = Vec::with_capacity(self.workers.len());
+ while let Some(result) = self.workers.join_next().await {
+ results.push(result.unwrap());
+ }
+ results
+ }
+}
+
+/// Track group progress during parallel push/pull in sync jobs
+pub(crate) struct SharedGroupProgress {
+ done: AtomicUsize,
+ total: usize,
+}
+
+impl SharedGroupProgress {
+ /// Create a new instance to track group progress with expected total number of groups
+ pub(crate) fn with_total_groups(total: usize) -> Self {
+ Self {
+ done: AtomicUsize::new(0),
+ total,
+ }
+ }
+
+ /// Return current counter value for done groups
+ pub(crate) fn load_done(&self) -> u64 {
+ self.done.load(Ordering::Acquire) as u64
+ }
+
+ /// Increment counter for done groups and return new value
+ pub(crate) fn increment_done(&self) -> u64 {
+ self.done.fetch_add(1, Ordering::AcqRel) as u64 + 1
+ }
+
+ /// Return the number of total backup groups
+ pub(crate) fn total_groups(&self) -> u64 {
+ self.total as u64
+ }
+}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (6 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
` (3 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Pulling groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix pull job log messages by the corresponding group or
snapshot and set the error context accordingly.
Also, reword some messages, inline variables in format strings and
start log lines with capital letters to get consistent output.
Example output for a sequential pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (111.223 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (19.884 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...
```
Example output for a parallel pull job:
```
...
Snapshot ct/100/2025-01-15T12:29:44Z: start sync
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive pct.conf.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive root.pxar.didx
Snapshot vm/200/2025-01-15T12:30:06Z: start sync
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive qemu-server.conf.blob
Snapshot vm/200/2025-01-15T12:30:06Z: sync archive drive-scsi0.img.fidx
Snapshot ct/100/2025-01-15T12:29:44Z: archive root.pxar.didx: downloaded 171.851 MiB (206.124 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: sync archive catalog.pcat1.didx
Snapshot ct/100/2025-01-15T12:29:44Z: archive catalog.pcat1.didx: downloaded 180.195 KiB (1.972 MiB/s)
Snapshot ct/100/2025-01-15T12:29:44Z: got backup log file client.log.blob
Snapshot ct/100/2025-01-15T12:29:44Z: sync done
...
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/pull.rs | 93 +++++++++++++++++++++++++++-------------------
src/server/sync.rs | 7 ++--
2 files changed, 59 insertions(+), 41 deletions(-)
diff --git a/src/server/pull.rs b/src/server/pull.rs
index b11e93e6c..2ed78b840 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -149,6 +149,7 @@ async fn pull_index_chunks<I: IndexFile>(
index: I,
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
backend: &DatastoreBackend,
+ prefix: &str,
) -> Result<SyncStats, Error> {
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -244,7 +245,7 @@ async fn pull_index_chunks<I: IndexFile>(
let chunk_count = chunk_count.load(Ordering::SeqCst);
info!(
- "downloaded {} ({}/s)",
+ "{prefix}: downloaded {} ({}/s)",
HumanByte::from(bytes),
HumanByte::new_binary(bytes as f64 / elapsed.as_secs_f64()),
);
@@ -289,6 +290,8 @@ async fn pull_single_archive<'a>(
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
backend: &DatastoreBackend,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {}", snapshot.dir());
+
let archive_name = &archive_info.filename;
let mut path = snapshot.full_path();
path.push(archive_name);
@@ -298,22 +301,29 @@ async fn pull_single_archive<'a>(
let mut sync_stats = SyncStats::default();
- info!("sync archive {archive_name}");
+ info!("{prefix}: sync archive {archive_name}");
+
+ let prefix = format!("Snapshot {}: archive {archive_name}", snapshot.dir());
reader.load_file_into(archive_name, &tmp_path).await?;
- let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
+ let mut tmpfile = std::fs::OpenOptions::new()
+ .read(true)
+ .open(&tmp_path)
+ .context(format!("archive {archive_name}"))?;
match ArchiveType::from_path(archive_name)? {
ArchiveType::DynamicIndex => {
let index = DynamicIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read dynamic index {:?} - {}", tmp_path, err)
+ format_err!(
+ "archive {archive_name}: unable to read dynamic index {tmp_path:?} - {err}"
+ )
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ info!("{prefix}: skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
reader
@@ -323,6 +333,7 @@ async fn pull_single_archive<'a>(
index,
encountered_chunks,
backend,
+ &prefix,
)
.await?;
sync_stats.add(stats);
@@ -330,13 +341,15 @@ async fn pull_single_archive<'a>(
}
ArchiveType::FixedIndex => {
let index = FixedIndexReader::new(tmpfile).map_err(|err| {
- format_err!("unable to read fixed index '{:?}' - {}", tmp_path, err)
+ format_err!(
+ "archive {archive_name}: unable to read fixed index '{tmp_path:?}' - {err}"
+ )
})?;
let (csum, size) = index.compute_csum();
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(format!("archive {archive_name}"))?;
if reader.skip_chunk_sync(snapshot.datastore().name()) {
- info!("skipping chunk sync for same datastore");
+ info!("{prefix}: skipping chunk sync for same datastore");
} else {
let stats = pull_index_chunks(
reader
@@ -346,6 +359,7 @@ async fn pull_single_archive<'a>(
index,
encountered_chunks,
backend,
+ &prefix,
)
.await?;
sync_stats.add(stats);
@@ -354,11 +368,11 @@ async fn pull_single_archive<'a>(
ArchiveType::Blob => {
tmpfile.rewind()?;
let (csum, size) = sha256(&mut tmpfile)?;
- verify_archive(archive_info, &csum, size)?;
+ verify_archive(archive_info, &csum, size).context(prefix.clone())?;
}
}
if let Err(err) = std::fs::rename(&tmp_path, &path) {
- bail!("Atomic rename file {:?} failed - {}", path, err);
+ bail!("archive {archive_name}: Atomic rename file {path:?} failed - {err}");
}
backend
@@ -386,13 +400,14 @@ async fn pull_snapshot<'a>(
is_new: bool,
) -> Result<SyncStats, Error> {
if is_new {
- info!("sync snapshot {}", snapshot.dir());
+ info!("{}: start sync", snapshot.dir());
} else if corrupt {
info!("re-sync snapshot {} due to corruption", snapshot.dir());
} else {
info!("re-sync snapshot {}", snapshot.dir());
}
+ let prefix = format!("Snapshot {}", snapshot.dir());
let mut sync_stats = SyncStats::default();
let mut manifest_name = snapshot.full_path();
manifest_name.push(MANIFEST_BLOB_NAME.as_ref());
@@ -405,7 +420,8 @@ async fn pull_snapshot<'a>(
let tmp_manifest_blob;
if let Some(data) = reader
.load_file_into(MANIFEST_BLOB_NAME.as_ref(), &tmp_manifest_name)
- .await?
+ .await
+ .context(prefix.clone())?
{
tmp_manifest_blob = data;
} else {
@@ -415,21 +431,21 @@ async fn pull_snapshot<'a>(
if manifest_name.exists() && !corrupt {
let manifest_blob = proxmox_lang::try_block!({
let mut manifest_file = std::fs::File::open(&manifest_name).map_err(|err| {
- format_err!("unable to open local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to open local manifest {manifest_name:?} - {err}")
})?;
let manifest_blob = DataBlob::load_from_reader(&mut manifest_file)?;
Ok(manifest_blob)
})
.map_err(|err: Error| {
- format_err!("unable to read local manifest {manifest_name:?} - {err}")
+ format_err!("{prefix}: unable to read local manifest {manifest_name:?} - {err}")
})?;
if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
if !client_log_name.exists() {
reader.try_download_client_log(&client_log_name).await?;
};
- info!("no data changes");
+ info!("{prefix}: no data changes");
let _ = std::fs::remove_file(&tmp_manifest_name);
return Ok(sync_stats); // nothing changed
}
@@ -468,7 +484,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -478,7 +494,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -488,7 +504,7 @@ async fn pull_snapshot<'a>(
match manifest.verify_file(&filename, &csum, size) {
Ok(_) => continue,
Err(err) => {
- info!("detected changed file {path:?} - {err}");
+ info!("{prefix}: detected changed file {path:?} - {err}");
}
}
}
@@ -507,7 +523,7 @@ async fn pull_snapshot<'a>(
}
if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) {
- bail!("Atomic rename file {:?} failed - {}", manifest_name, err);
+ bail!("{prefix}: Atomic rename file {manifest_name:?} failed - {err}");
}
if let DatastoreBackend::S3(s3_client) = backend {
let object_key = pbs_datastore::s3::object_key_from_path(
@@ -546,7 +562,7 @@ async fn pull_snapshot<'a>(
};
snapshot
.cleanup_unreferenced_files(&manifest)
- .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
+ .map_err(|err| format_err!("{prefix}: failed to cleanup unreferenced files - {err}"))?;
Ok(sync_stats)
}
@@ -562,9 +578,12 @@ async fn pull_snapshot_from<'a>(
encountered_chunks: Arc<Mutex<EncounteredChunks>>,
corrupt: bool,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {}", snapshot.dir());
+
let (_path, is_new, _snap_lock) = snapshot
.datastore()
- .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+ .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())
+ .context(prefix.clone())?;
let result = pull_snapshot(
params,
@@ -585,11 +604,11 @@ async fn pull_snapshot_from<'a>(
snapshot.as_ref(),
true,
) {
- info!("cleanup error - {cleanup_err}");
+ info!("{prefix}: cleanup error - {cleanup_err}");
}
return Err(err);
}
- Ok(_) => info!("sync snapshot {} done", snapshot.dir()),
+ Ok(_) => info!("{prefix}: sync done"),
}
}
@@ -619,6 +638,7 @@ async fn pull_group(
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Group {group}");
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -710,11 +730,11 @@ async fn pull_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ info!("{prefix}: {already_synced_skip_info}");
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ info!("{prefix}: {transfer_last_skip_info}");
transfer_last_skip_info.reset();
}
@@ -807,7 +827,7 @@ async fn pull_group(
// Update done groups progress by other parallel running pulls
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: group {group}: {local_progress}");
+ info!("{prefix}: percentage done: {local_progress}");
let stats = result?; // stop on error
sync_stats.add(stats);
@@ -826,12 +846,12 @@ async fn pull_group(
}
if snapshot.is_protected() {
info!(
- "don't delete vanished snapshot {} (protected)",
- snapshot.dir()
+ "{prefix}: don't delete vanished snapshot {} (protected)",
+ snapshot.dir(),
);
continue;
}
- info!("delete vanished snapshot {}", snapshot.dir());
+ info!("{prefix}: delete vanished snapshot {}", snapshot.dir());
params
.target
.store
@@ -1031,10 +1051,7 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
}
Err(err) => {
errors = true;
- info!(
- "Encountered errors while syncing namespace {} - {err}",
- &namespace,
- );
+ info!("Encountered errors while syncing namespace {namespace} - {err}");
}
};
}
@@ -1113,7 +1130,7 @@ async fn pull_ns(
list.sort_unstable();
info!(
- "found {} groups to sync (out of {unfiltered_count} total)",
+ "Found {} groups to sync (out of {unfiltered_count} total)",
list.len()
);
@@ -1182,7 +1199,7 @@ async fn pull_ns(
if !local_group.apply_filters(¶ms.group_filter) {
continue;
}
- info!("delete vanished group '{local_group}'");
+ info!("Delete vanished group '{local_group}'");
let delete_stats_result = params
.target
.store
@@ -1191,7 +1208,7 @@ async fn pull_ns(
match delete_stats_result {
Ok(stats) => {
if !stats.all_removed() {
- info!("kept some protected snapshots of group '{local_group}'");
+ info!("Kept some protected snapshots of group '{local_group}'");
sync_stats.add(SyncStats::from(RemovedVanishedStats {
snapshots: stats.removed_snapshots(),
groups: 0,
@@ -1214,7 +1231,7 @@ async fn pull_ns(
Ok(())
});
if let Err(err) = result {
- info!("error during cleanup: {err}");
+ info!("Error during cleanup: {err}");
errors = true;
};
}
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 17d736c41..416bc943d 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -138,13 +138,13 @@ impl SyncSourceReader for RemoteSourceReader {
Some(HttpError { code, message }) => match *code {
StatusCode::NOT_FOUND => {
info!(
- "skipping snapshot {} - vanished since start of sync",
+ "Snapshot {}: skipped because vanished since start of sync",
&self.dir
);
return Ok(None);
}
_ => {
- bail!("HTTP error {code} - {message}");
+ bail!("Snapshot {}: HTTP error {code} - {message}", &self.dir);
}
},
None => {
@@ -178,7 +178,8 @@ impl SyncSourceReader for RemoteSourceReader {
bail!("Atomic rename file {to_path:?} failed - {err}");
}
info!(
- "got backup log file {client_log_name}",
+ "Snapshot {snapshot}: got backup log file {client_log_name}",
+ snapshot = &self.dir,
client_log_name = client_log_name.deref()
);
}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (7 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently Christian Ebner
` (2 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
When performing parallel group syncs, the push parameters must be
shared between all tasks which is not possible with regular
references due to lifetime and ownership issues. Pack them into an
atomic reference counter instead so they can easily be cloned when
required.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 28 +++++++++++++++++-----------
1 file changed, 17 insertions(+), 11 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index a0484ef62..5828f2ed1 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -404,6 +404,7 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
let (mut groups, mut snapshots) = (0, 0);
let mut stats = SyncStats::default();
+ let params = Arc::new(params);
for source_namespace in &source_namespaces {
let source_store_and_ns = print_store_and_ns(params.source.store.name(), source_namespace);
let target_namespace = params.map_to_target(source_namespace)?;
@@ -427,7 +428,7 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
continue;
}
- match push_namespace(source_namespace, ¶ms).await {
+ match push_namespace(source_namespace, Arc::clone(¶ms)).await {
Ok((sync_progress, sync_stats, sync_errors)) => {
errors |= sync_errors;
stats.add(sync_stats);
@@ -522,11 +523,11 @@ pub(crate) async fn push_store(mut params: PushParameters) -> Result<SyncStats,
/// Iterate over all backup groups in the namespace and push them to the target.
pub(crate) async fn push_namespace(
namespace: &BackupNamespace,
- params: &PushParameters,
+ params: Arc<PushParameters>,
) -> Result<(StoreProgress, SyncStats, bool), Error> {
let target_namespace = params.map_to_target(namespace)?;
// Check if user is allowed to perform backups on remote datastore
- check_ns_remote_datastore_privs(params, &target_namespace, PRIV_REMOTE_DATASTORE_BACKUP)
+ check_ns_remote_datastore_privs(¶ms, &target_namespace, PRIV_REMOTE_DATASTORE_BACKUP)
.context("Pushing to remote namespace not allowed")?;
let mut list: Vec<BackupGroup> = params
@@ -554,7 +555,7 @@ pub(crate) async fn push_namespace(
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
- fetch_target_groups(params, &target_namespace).await?;
+ fetch_target_groups(¶ms, &target_namespace).await?;
for (done, group) in list.into_iter().enumerate() {
progress.done_groups = done as u64;
@@ -570,7 +571,7 @@ pub(crate) async fn push_namespace(
}
synced_groups.insert(group.clone());
- match push_group(params, namespace, &group, &mut progress).await {
+ match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await {
Ok(sync_stats) => stats.add(sync_stats),
Err(err) => {
warn!("Encountered errors: {err:#}");
@@ -590,7 +591,7 @@ pub(crate) async fn push_namespace(
continue;
}
- match remove_target_group(params, &target_namespace, &target_group).await {
+ match remove_target_group(¶ms, &target_namespace, &target_group).await {
Ok(delete_stats) => {
info!("Removed vanished group {target_group} from remote");
if delete_stats.protected_snapshots() > 0 {
@@ -672,7 +673,7 @@ async fn forget_target_snapshot(
/// - Iterate the snapshot list and push each snapshot individually
/// - (Optional): Remove vanished groups if `remove_vanished` flag is set
pub(crate) async fn push_group(
- params: &PushParameters,
+ params: Arc<PushParameters>,
namespace: &BackupNamespace,
group: &BackupGroup,
progress: &mut StoreProgress,
@@ -691,7 +692,7 @@ pub(crate) async fn push_group(
}
let target_namespace = params.map_to_target(namespace)?;
- let mut target_snapshots = fetch_target_snapshots(params, &target_namespace, group).await?;
+ let mut target_snapshots = fetch_target_snapshots(¶ms, &target_namespace, group).await?;
target_snapshots.sort_unstable_by_key(|a| a.backup.time);
let last_snapshot_time = target_snapshots
@@ -748,8 +749,13 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
- let result =
- push_snapshot(params, namespace, &source_snapshot, fetch_previous_manifest).await;
+ let result = push_snapshot(
+ ¶ms,
+ namespace,
+ &source_snapshot,
+ fetch_previous_manifest,
+ )
+ .await;
fetch_previous_manifest = true;
progress.done_snapshots = pos as u64 + 1;
@@ -772,7 +778,7 @@ pub(crate) async fn push_group(
);
continue;
}
- match forget_target_snapshot(params, &target_namespace, &snapshot.backup).await {
+ match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await {
Ok(()) => {
info!(
"Removed vanished snapshot {name} from remote",
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (8 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Improve the throughput for sync jobs in push direction by allowing
to push up to a configured number of backup groups concurrently, by
creating multiple futures, each connecting and pushing a group to
the reomte target.
The store progress and sync group housekeeping are placed behind a
atomic reference counted mutex to allow for concurrent access of
status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 97 ++++++++++++++++++++++++++++++++++------------
1 file changed, 72 insertions(+), 25 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index 5828f2ed1..e7d56cc2a 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -34,6 +34,7 @@ use super::sync::{
SyncSource, SyncStats,
};
use crate::api2::config::remote;
+use crate::server::sync::{GroupWorkerSet, SharedGroupProgress};
/// Target for backups to be pushed to
pub(crate) struct PushTarget {
@@ -550,41 +551,58 @@ pub(crate) async fn push_namespace(
let mut errors = false;
// Remember synced groups, remove others when the remove vanished flag is set
- let mut synced_groups = HashSet::new();
+ let synced_groups = Arc::new(Mutex::new(HashSet::new()));
let mut progress = StoreProgress::new(list.len() as u64);
let mut stats = SyncStats::default();
let (owned_target_groups, not_owned_target_groups) =
fetch_target_groups(¶ms, &target_namespace).await?;
+ let not_owned_target_groups = Arc::new(not_owned_target_groups);
- for (done, group) in list.into_iter().enumerate() {
- progress.done_groups = done as u64;
- progress.done_snapshots = 0;
- progress.group_snapshots = 0;
+ let mut group_workers = GroupWorkerSet::with_capacity(params.worker_threads.unwrap_or(1));
+ let shared_group_progress = Arc::new(SharedGroupProgress::with_total_groups(list.len()));
- if not_owned_target_groups.contains(&group) {
- warn!(
- "Group '{group}' not owned by remote user '{}' on target, skipping upload",
- params.target.remote_user(),
- );
- continue;
- }
- synced_groups.insert(group.clone());
-
- match push_group(Arc::clone(¶ms), namespace, &group, &mut progress).await {
- Ok(sync_stats) => stats.add(sync_stats),
- Err(err) => {
- warn!("Encountered errors: {err:#}");
- warn!("Failed to push group {group} to remote!");
- errors = true;
+ let mut process_results = |results| {
+ for result in results {
+ match result {
+ Ok(sync_stats) => {
+ stats.add(sync_stats);
+ progress.done_groups = shared_group_progress.increment_done();
+ }
+ Err(()) => errors = true,
}
}
+ };
+
+ for group in list.into_iter() {
+ let namespace = namespace.clone();
+ let params = Arc::clone(¶ms);
+ let not_owned_target_groups = Arc::clone(¬_owned_target_groups);
+ let synced_groups = Arc::clone(&synced_groups);
+ let group_progress_cloned = Arc::clone(&shared_group_progress);
+ let results = group_workers
+ .spawn_task(async move {
+ push_group_do(
+ params,
+ &namespace,
+ &group,
+ group_progress_cloned,
+ synced_groups,
+ not_owned_target_groups,
+ )
+ .await
+ })
+ .await;
+ process_results(results);
}
+ let results = group_workers.join_active().await;
+ process_results(results);
+
if params.remove_vanished {
// only ever allow to prune owned groups on target
for target_group in owned_target_groups {
- if synced_groups.contains(&target_group) {
+ if synced_groups.lock().unwrap().contains(&target_group) {
continue;
}
if !target_group.apply_filters(¶ms.group_filter) {
@@ -663,6 +681,32 @@ async fn forget_target_snapshot(
Ok(())
}
+async fn push_group_do(
+ params: Arc<PushParameters>,
+ namespace: &BackupNamespace,
+ group: &BackupGroup,
+ shared_group_progress: Arc<SharedGroupProgress>,
+ synced_groups: Arc<Mutex<HashSet<BackupGroup>>>,
+ not_owned_target_groups: Arc<HashSet<BackupGroup>>,
+) -> Result<SyncStats, ()> {
+ if not_owned_target_groups.contains(group) {
+ warn!(
+ "Group '{group}' not owned by remote user '{}' on target, skipping upload",
+ params.target.remote_user(),
+ );
+ shared_group_progress.increment_done();
+ return Ok(SyncStats::default());
+ }
+
+ synced_groups.lock().unwrap().insert(group.clone());
+ push_group(params, namespace, group, Arc::clone(&shared_group_progress))
+ .await
+ .map_err(|err| {
+ warn!("Group {group}: Encountered errors: {err:#}");
+ warn!("Failed to push group {group} to remote!");
+ })
+}
+
/// Push group including all snaphshots to target
///
/// Iterate over all snapshots in the group and push them to the target.
@@ -676,7 +720,7 @@ pub(crate) async fn push_group(
params: Arc<PushParameters>,
namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -744,7 +788,8 @@ pub(crate) async fn push_group(
transfer_last_skip_info.reset();
}
- progress.group_snapshots = snapshots.len() as u64;
+ let mut local_progress = StoreProgress::new(shared_group_progress.total_groups());
+ local_progress.group_snapshots = snapshots.len() as u64;
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
@@ -758,8 +803,10 @@ pub(crate) async fn push_group(
.await;
fetch_previous_manifest = true;
- progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: {progress}");
+ // Update done groups progress by other parallel running pushes
+ local_progress.done_groups = shared_group_progress.load_done();
+ local_progress.done_snapshots = pos as u64 + 1;
+ info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (9 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Pushing groups and therefore also snapshots in parallel leads to
unordered log outputs, making it mostly impossible to relate a log
message to a backup snapshot/group.
Therefore, prefix push job log messages by the corresponding group or
snapshot.
Also, be more verbose for push syncs, adding additional log output
for the groups, snapshots and archives being pushed.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
src/server/push.rs | 49 ++++++++++++++++++++++++++++++++++++----------
1 file changed, 39 insertions(+), 10 deletions(-)
diff --git a/src/server/push.rs b/src/server/push.rs
index e7d56cc2a..fe404b8a2 100644
--- a/src/server/push.rs
+++ b/src/server/push.rs
@@ -28,6 +28,8 @@ use pbs_datastore::index::IndexFile;
use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, StoreProgress};
+use proxmox_human_byte::HumanByte;
+
use super::sync::{
check_namespace_depth_limit, exclude_not_verified_or_encrypted,
ignore_not_verified_or_encrypted, LocalSource, RemovedVanishedStats, SkipInfo, SkipReason,
@@ -722,6 +724,7 @@ pub(crate) async fn push_group(
group: &BackupGroup,
shared_group_progress: Arc<SharedGroupProgress>,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Group {group}");
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -780,11 +783,11 @@ pub(crate) async fn push_group(
.collect();
if already_synced_skip_info.count > 0 {
- info!("{already_synced_skip_info}");
+ info!("{prefix}: {already_synced_skip_info}");
already_synced_skip_info.reset();
}
if transfer_last_skip_info.count > 0 {
- info!("{transfer_last_skip_info}");
+ info!("{prefix}: {transfer_last_skip_info}");
transfer_last_skip_info.reset();
}
@@ -794,6 +797,7 @@ pub(crate) async fn push_group(
let mut stats = SyncStats::default();
let mut fetch_previous_manifest = !target_snapshots.is_empty();
for (pos, source_snapshot) in snapshots.into_iter().enumerate() {
+ info!("Snapshot {source_snapshot}: start sync");
let result = push_snapshot(
¶ms,
namespace,
@@ -806,10 +810,11 @@ pub(crate) async fn push_group(
// Update done groups progress by other parallel running pushes
local_progress.done_groups = shared_group_progress.load_done();
local_progress.done_snapshots = pos as u64 + 1;
- info!("Percentage done: group {group}: {local_progress}");
// stop on error
let sync_stats = result?;
+ info!("Snapshot {source_snapshot}: sync done");
+ info!("Percentage done: group {group}: {local_progress}");
stats.add(sync_stats);
}
@@ -820,7 +825,7 @@ pub(crate) async fn push_group(
}
if snapshot.protected {
info!(
- "Kept protected snapshot {name} on remote",
+ "{prefix}: Kept protected snapshot {name} on remote",
name = snapshot.backup
);
continue;
@@ -828,14 +833,14 @@ pub(crate) async fn push_group(
match forget_target_snapshot(¶ms, &target_namespace, &snapshot.backup).await {
Ok(()) => {
info!(
- "Removed vanished snapshot {name} from remote",
+ "{prefix}: Removed vanished snapshot {name} from remote",
name = snapshot.backup
);
}
Err(err) => {
- warn!("Encountered errors: {err:#}");
+ warn!("{prefix}: Encountered errors: {err:#}");
warn!(
- "Failed to remove vanished snapshot {name} from remote!",
+ "{prefix}: Failed to remove vanished snapshot {name} from remote!",
name = snapshot.backup
);
}
@@ -863,6 +868,7 @@ pub(crate) async fn push_snapshot(
snapshot: &BackupDir,
fetch_previous_manifest: bool,
) -> Result<SyncStats, Error> {
+ let prefix = format!("Snapshot {snapshot}");
let mut stats = SyncStats::default();
let target_ns = params.map_to_target(namespace)?;
let backup_dir = params
@@ -878,8 +884,8 @@ pub(crate) async fn push_snapshot(
Ok((manifest, _raw_size)) => manifest,
Err(err) => {
// No manifest in snapshot or failed to read, warn and skip
- log::warn!("Encountered errors: {err:#}");
- log::warn!("Failed to load manifest for '{snapshot}'!");
+ warn!("{prefix}: Encountered errors: {err:#}");
+ warn!("{prefix}: Failed to load manifest!");
return Ok(stats);
}
};
@@ -913,7 +919,7 @@ pub(crate) async fn push_snapshot(
if fetch_previous_manifest {
match backup_writer.download_previous_manifest().await {
Ok(manifest) => previous_manifest = Some(Arc::new(manifest)),
- Err(err) => log::info!("Could not download previous manifest - {err}"),
+ Err(err) => info!("{prefix}: Could not download previous manifest - {err}"),
}
};
@@ -942,12 +948,21 @@ pub(crate) async fn push_snapshot(
path.push(&entry.filename);
if path.try_exists()? {
let archive_name = BackupArchiveName::from_path(&entry.filename)?;
+ info!("{prefix}: sync archive {archive_name}");
+ let prefix = format!("Snapshot {snapshot}: archive {archive_name}");
match archive_name.archive_type() {
ArchiveType::Blob => {
let file = std::fs::File::open(&path)?;
let backup_stats = backup_writer
.upload_blob(file, archive_name.as_ref())
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(backup_stats.size),
+ HumanByte::new_binary(
+ backup_stats.size as f64 / backup_stats.duration.as_secs_f64()
+ ),
+ );
stats.add(SyncStats {
chunk_count: backup_stats.chunk_count as usize,
bytes: backup_stats.size as usize,
@@ -979,6 +994,13 @@ pub(crate) async fn push_snapshot(
known_chunks.clone(),
)
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ );
stats.add(sync_stats);
}
ArchiveType::FixedIndex => {
@@ -1006,6 +1028,13 @@ pub(crate) async fn push_snapshot(
known_chunks.clone(),
)
.await?;
+ info!(
+ "{prefix}: uploaded {} ({}/s)",
+ HumanByte::from(sync_stats.bytes),
+ HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64()
+ ),
+ );
stats.add(sync_stats);
}
}
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
` (10 preceding siblings ...)
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
@ 2026-03-09 16:20 ` Christian Ebner
11 siblings, 0 replies; 13+ messages in thread
From: Christian Ebner @ 2026-03-09 16:20 UTC (permalink / raw)
To: pbs-devel
Allows to configure the number of parallel group works via the web
interface.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
www/window/SyncJobEdit.js | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 074c7855a..26c82bc71 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -448,6 +448,17 @@ Ext.define('PBS.window.SyncJobEdit', {
deleteEmpty: '{!isCreate}',
},
},
+ {
+ xtype: 'proxmoxintegerfield',
+ name: 'worker-threads',
+ fieldLabel: gettext('# of Group Workers'),
+ emptyText: '1',
+ minValue: 1,
+ maxValue: 32,
+ cbind: {
+ deleteEmpty: '{!isCreate}',
+ },
+ },
{
xtype: 'proxmoxcheckbox',
fieldLabel: gettext('Re-sync Corrupt'),
--
2.47.3
^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2026-03-09 16:22 UTC | newest]
Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-03-09 16:20 [PATCH proxmox{,-backup} v5 00/12] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox v5 1/1] pbs api types: add `worker-threads` to sync job config Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 01/11] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 02/11] api: config/sync: add optional `worker-threads` property Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 03/11] sync: pull: revert avoiding reinstantiation for encountered chunks map Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 04/11] sync: pull: factor out backup group locking and owner check Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 05/11] sync: pull: prepare pull parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 06/11] fix #4182: server: sync: allow pulling backup groups in parallel Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 07/11] server: pull: prefix log messages and add error context Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 08/11] sync: push: prepare push parameters to be shared across parallel tasks Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 09/11] server: sync: allow pushing groups concurrently Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 10/11] server: push: prefix log messages and add additional logging Christian Ebner
2026-03-09 16:20 ` [PATCH proxmox-backup v5 11/11] ui: expose group worker setting in sync job edit window Christian Ebner
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox