From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68])
	by lore.proxmox.com (Postfix) with ESMTPS id 4A9EC1FF168
	for <inbox@lore.proxmox.com>; Tue, 18 Mar 2025 13:24:57 +0100 (CET)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id 726801E7F5;
	Tue, 18 Mar 2025 13:24:43 +0100 (CET)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Tue, 18 Mar 2025 13:24:20 +0100
Message-Id: <20250318122423.385684-5-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.5
In-Reply-To: <20250318122423.385684-1-c.ebner@proxmox.com>
References: <20250318122423.385684-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.031 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [PATCH v3 proxmox-backup 4/7] fix #4182: server: sync:
 allow pulling groups concurrently
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Currently, a sync job sequentially pulls the backup groups and the
snapshots contained within them, therefore being limited in download
speed by the http2 connection of the source reader instance in case
of remote syncs. High latency networks suffer from limited download
speed.

Improve the throughput by allowing to pull up to a configured number
of backup groups concurrently, by creating tasks connecting and
pulling from the remote source in parallel.

Make the error handling and accounting logic for each group pull
reusable by moving it into its own helper function, returning the
future.

The store progress is placed behind an atomic reference counted mutex
to allow for concurrent access of status updates.

Link to issue in bugtracker:
https://bugzilla.proxmox.com/show_bug.cgi?id=4182

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 2:
- no changes

 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 111 ++++++++++++++++++----------
 2 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 0986bc5c8..27315f1ae 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
 use proxmox_human_byte::HumanByte;
 use tracing::info;
 
@@ -513,7 +515,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    store_progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<SyncStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -602,7 +604,8 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    let mut local_progress = store_progress.lock().unwrap().clone();
+    local_progress.group_snapshots = list.len() as u64;
 
     let mut sync_stats = SyncStats::default();
 
@@ -619,8 +622,11 @@ async fn pull_group(
         let result =
             pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone(), corrupt).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        store_progress.lock().unwrap().done_snapshots += 1;
+        // Update done groups progress by other parallel running pulls
+        local_progress.done_groups = store_progress.lock().unwrap().done_groups;
+        local_progress.done_snapshots = pos as u64 + 1;
+        info!("Percentage done: {local_progress}");
 
         let stats = result?; // stop on error
         sync_stats.add(stats);
@@ -864,6 +870,48 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<SyncStats,
     Ok(sync_stats)
 }
 
+async fn pull_group_do(
+    params: &PullParameters,
+    group: &BackupGroup,
+    namespace: &BackupNamespace,
+    target_namespace: &BackupNamespace,
+    progress: Arc<Mutex<StoreProgress>>,
+) -> Result<SyncStats, ()> {
+    let (owner, _lock_guard) =
+        match params
+            .target
+            .store
+            .create_locked_backup_group(target_namespace, group, &params.owner)
+        {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                info!("create_locked_backup_group failed");
+                return Err(());
+            }
+        };
+
+    if params.owner != owner {
+        // only the owner is allowed to create additional snapshots
+        info!(
+            "sync group {group} failed - owner check failed ({} != {owner})",
+            params.owner,
+        );
+        return Err(());
+    }
+
+    match pull_group(params, namespace, group, progress.clone()).await {
+        Ok(sync_stats) => {
+            progress.lock().unwrap().done_groups += 1;
+            Ok(sync_stats)
+        }
+        Err(err) => {
+            info!("sync group {group} failed - {err}");
+            Err(())
+        }
+    }
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -902,48 +950,29 @@ pub(crate) async fn pull_ns(
         new_groups.insert(group.clone());
     }
 
-    let mut progress = StoreProgress::new(list.len() as u64);
+    let progress = Arc::new(Mutex::new(StoreProgress::new(list.len() as u64)));
     let mut sync_stats = SyncStats::default();
 
     let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-    for (done, group) in list.into_iter().enumerate() {
-        progress.done_groups = done as u64;
-        progress.done_snapshots = 0;
-        progress.group_snapshots = 0;
+    let mut puller = FuturesUnordered::new();
+    let mut group_futures_iter = list
+        .iter()
+        .map(|group| pull_group_do(params, group, namespace, &target_ns, progress.clone()));
 
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
+    for _ in 0..params.parallel_groups.unwrap_or(1) {
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
+        }
+    }
 
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => sync_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
+    while let Some(result) = puller.next().await {
+        match result {
+            Ok(stats) => sync_stats.add(stats),
+            Err(()) => errors |= true,
+        };
+        if let Some(future) = group_futures_iter.next() {
+            puller.push(future);
         }
     }
 
@@ -999,5 +1028,7 @@ pub(crate) async fn pull_ns(
         };
     }
 
+    let progress = progress.lock().unwrap().clone();
+
     Ok((progress, sync_stats, errors))
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel