public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v3 proxmox-backup 4/7] fix #4182: server: sync: allow pulling groups concurrently
Date: Tue, 18 Mar 2025 13:24:20 +0100	[thread overview]
Message-ID: <20250318122423.385684-5-c.ebner@proxmox.com> (raw)
In-Reply-To: <20250318122423.385684-1-c.ebner@proxmox.com>

Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.

Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.

Make the error handling and accounting logic for each group pull
reusable by moving it into its own helper function, returning the
future.

The store progress is placed behind an atomic reference counted mutex
to allow for concurrent access of status updates.

Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- no changes

 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 111 ++++++++++++++++++----------
 2 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0986bc5c8..27315f1ae 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use proxmox_human_byte::HumanByte;
 use tracing::info;
 
@@ -513,7 +515,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    store_progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -602,7 +604,8 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    let mut local_progress = store_progress.lock().unwrap().clone();
+    local_progress.group_snapshots = list.len() as u64;
 
     let mut sync_stats = SyncStats::default();
 
@@ -619,8 +622,11 @@ async fn pull_group(
         let result =
             pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        store_progress.lock().unwrap().done_snapshots += 1;
+        // Update done groups progress by other parallel running pulls
+        local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("Percentage done: {local_progress}");
 
         let stats = result?; // stop on error
         sync_stats.add(stats);
@@ -864,6 +870,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
     Ok(sync_stats)
 }
 
+async fn pull_group_do(
+    params: &PullParameters,
+    group: &BackupGroup,
+    namespace: &BackupNamespace,
+    target_namespace: &BackupNamespace,
+    progress: Arc<Mutex<StoreProgress>>,
+) -> Result<SyncStats, ()> {
+    let (owner, _lock_guard) =
+        match params
+            .target
+            .store
+            .create_locked_backup_group(target_namespace, group, &params.owner)
+        {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                info!("create_locked_backup_group failed");
+                return Err(());
+            }
+        };
+
+    if params.owner != owner {
+        // only the owner is allowed to create additional snapshots
+        info!(
+            "sync group {group} failed - owner check failed ({} != {owner})",
+            params.owner,
+        );
+        return Err(());
+    }
+
+    match pull_group(params, namespace, group, progress.clone()).await {
+        Ok(sync_stats) => {
+            progress.lock().unwrap().done_groups += 1;
+            Ok(sync_stats)
+        }
+        Err(err) => {
+            info!("sync group {group} failed - {err}");
+            Err(())
+        }
+    }
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -902,48 +950,29 @@ pub(crate) async fn pull_ns(
         new_groups.insert(group.clone());
     }
 
-    let mut progress = StoreProgress::new(list.len() as u64);
+    let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
     let mut sync_stats = SyncStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let mut puller = FuturesUnordered::new();
+    let mut group_futures_iter = list
+        .iter()
+        .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
 
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
+    for _ in 0..params.parallel_groups.unwrap_or(1) {
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
+        }
+    }
 
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => sync_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
+    while let Some(result) = puller.next().await {
+        match result {
+            Ok(stats) => sync_stats.add(stats),
+            Err(()) => errors |= true,
+        };
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
         }
     }
 
@@ -999,5 +1028,7 @@ pub(crate) async fn pull_ns(
         };
     }
 
+    let progress = progress.lock().unwrap().clone();
+
     Ok((progress, sync_stats, errors))
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2025-03-18 12:24 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-03-18 12:24 [pbs-devel] [PATCH v3 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox 1/7] pbs api types: add 'parallel-groups' to sync job config Christian Ebner
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox-backup 2/7] client: backup writer: fix upload stats size and rate for push sync Christian Ebner
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox-backup 3/7] api: config/sync: add optional `parallel-groups` property Christian Ebner
2025-03-18 12:24 ` Christian Ebner [this message]
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox-backup 5/7] server: pull: prefix log messages and add error context Christian Ebner
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox-backup 6/7] server: sync: allow pushing groups concurrently Christian Ebner
2025-03-18 12:24 ` [pbs-devel] [PATCH v3 proxmox-backup 7/7] server: push: prefix log messages and add additional logging Christian Ebner
2025-04-04 13:51 ` [pbs-devel] superseded: [PATCH v3 proxmox proxmox-backup 0/7] fix #4182: concurrent group pull/push support for sync jobs Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20250318122423.385684-5-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal