public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
Date: Thu, 25 Jul 2024 12:19:20 +0200	[thread overview]
Message-ID: <20240725101922.231053-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>

Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 102 +++++++++++++++++-----------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
 //! Sync datastore by pulling contents from remote server
 
 use std::collections::{HashMap, HashSet};
+use std::future::Future;
 use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    {
+        let mut progress = progress.lock().unwrap();
+        progress.group_snapshots = list.len() as u64;
+    }
 
     let mut pull_stats = PullStats::default();
 
@@ -1095,8 +1100,11 @@ async fn pull_group(
             .await?;
         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        {
+            let mut progress = progress.lock().unwrap();
+            progress.done_snapshots = pos as u64 + 1;
+            info!("percentage done: {progress}");
+        }
 
         let stats = result?; // stop on error
         pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     Ok(pull_stats)
 }
 
+fn pull_group_task<'future>(
+    params: &'future PullParameters,
+    group: &'future BackupGroup,
+    namespace: &'future BackupNamespace,
+    target_namespace: &'future BackupNamespace,
+    progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+    Box::pin(async move {
+        let progress = Arc::new(Mutex::new(progress));
+        let mut pull_stats = PullStats::default();
+        let mut errors = false;
+
+        let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+            target_namespace,
+            group,
+            &params.owner,
+        ) {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                errors = true;
+                // do not stop here, instead continue
+                info!("create_locked_backup_group failed");
+                return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+            }
+        };
+
+        // permission check
+        if params.owner != owner {
+            // only the owner is allowed to create additional snapshots
+            info!(
+                "sync group {group} failed - owner check failed ({} != {owner})",
+                params.owner,
+            );
+            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(params, namespace, group, progress.clone()).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    info!("sync group {group} failed - {err}");
+                    errors = true; // do not bail here, instead continue
+                }
+            }
+        }
+
+        let progress = progress.lock().unwrap().clone();
+        Ok((progress, pull_stats, errors))
+    })
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
-
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
-
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
-        }
+        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
     }
 
     if params.remove_vanished {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-07-25 10:20 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` Christian Ebner [this message]
2024-07-30 15:56   ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Gabriel Goller
2024-07-31  7:38     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54   ` Gabriel Goller
2024-07-31  7:35     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240725101922.231053-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal