From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <pbs-devel-bounces@lists.proxmox.com>
Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9])
	by lore.proxmox.com (Postfix) with ESMTPS id 7F6791FF15D
	for <inbox@lore.proxmox.com>; Thu, 25 Jul 2024 12:20:20 +0200 (CEST)
Received: from firstgate.proxmox.com (localhost [127.0.0.1])
	by firstgate.proxmox.com (Proxmox) with ESMTP id E696A352D;
	Thu, 25 Jul 2024 12:20:20 +0200 (CEST)
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Date: Thu, 25 Jul 2024 12:19:20 +0200
Message-Id: <20240725101922.231053-3-c.ebner@proxmox.com>
X-Mailer: git-send-email 2.39.2
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>
References: <20240725101922.231053-1-c.ebner@proxmox.com>
MIME-Version: 1.0
X-SPAM-LEVEL: Spam detection results:  0
 AWL 0.021 Adjusted score from AWL reputation of From: address
 BAYES_00                 -1.9 Bayes spam probability is 0 to 1%
 DMARC_MISSING             0.1 Missing DMARC policy
 KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment
 SPF_HELO_NONE           0.001 SPF: HELO does not publish an SPF Record
 SPF_PASS               -0.001 SPF: sender matches SPF record
Subject: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group
 pull task into helper
X-BeenThere: pbs-devel@lists.proxmox.com
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: Proxmox Backup Server development discussion
 <pbs-devel.lists.proxmox.com>
List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe>
List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/>
List-Post: <mailto:pbs-devel@lists.proxmox.com>
List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help>
List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, 
 <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe>
Reply-To: Proxmox Backup Server development discussion
 <pbs-devel@lists.proxmox.com>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: pbs-devel-bounces@lists.proxmox.com
Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com>

Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 102 +++++++++++++++++-----------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
 //! Sync datastore by pulling contents from remote server
 
 use std::collections::{HashMap, HashSet};
+use std::future::Future;
 use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    {
+        let mut progress = progress.lock().unwrap();
+        progress.group_snapshots = list.len() as u64;
+    }
 
     let mut pull_stats = PullStats::default();
 
@@ -1095,8 +1100,11 @@ async fn pull_group(
             .await?;
         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        {
+            let mut progress = progress.lock().unwrap();
+            progress.done_snapshots = pos as u64 + 1;
+            info!("percentage done: {progress}");
+        }
 
         let stats = result?; // stop on error
         pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     Ok(pull_stats)
 }
 
+fn pull_group_task<'future>(
+    params: &'future PullParameters,
+    group: &'future BackupGroup,
+    namespace: &'future BackupNamespace,
+    target_namespace: &'future BackupNamespace,
+    progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+    Box::pin(async move {
+        let progress = Arc::new(Mutex::new(progress));
+        let mut pull_stats = PullStats::default();
+        let mut errors = false;
+
+        let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+            target_namespace,
+            group,
+            &params.owner,
+        ) {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                errors = true;
+                // do not stop here, instead continue
+                info!("create_locked_backup_group failed");
+                return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+            }
+        };
+
+        // permission check
+        if params.owner != owner {
+            // only the owner is allowed to create additional snapshots
+            info!(
+                "sync group {group} failed - owner check failed ({} != {owner})",
+                params.owner,
+            );
+            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(params, namespace, group, progress.clone()).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    info!("sync group {group} failed - {err}");
+                    errors = true; // do not bail here, instead continue
+                }
+            }
+        }
+
+        let progress = progress.lock().unwrap().clone();
+        Ok((progress, pull_stats, errors))
+    })
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
-
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
-
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
-        }
+        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
     }
 
     if params.remove_vanished {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel