all lists on lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
Date: Thu, 25 Jul 2024 12:19:20 +0200	[thread overview]
Message-ID: <20240725101922.231053-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>

Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
 pbs-datastore/src/store_progress.rs |   2 +-
 src/server/pull.rs                  | 102 +++++++++++++++++-----------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
 /// Tracker for progress of operations iterating over `Datastore` contents.
 pub struct StoreProgress {
     /// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
 //! Sync datastore by pulling contents from remote server
 
 use std::collections::{HashMap, HashSet};
+use std::future::Future;
 use std::io::{Seek, Write};
 use std::path::{Path, PathBuf};
+use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
     params: &PullParameters,
     source_namespace: &BackupNamespace,
     group: &BackupGroup,
-    progress: &mut StoreProgress,
+    progress: Arc<Mutex<StoreProgress>>,
 ) -> Result<PullStats, Error> {
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
     // start with 65536 chunks (up to 256 GiB)
     let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-    progress.group_snapshots = list.len() as u64;
+    {
+        let mut progress = progress.lock().unwrap();
+        progress.group_snapshots = list.len() as u64;
+    }
 
     let mut pull_stats = PullStats::default();
 
@@ -1095,8 +1100,11 @@ async fn pull_group(
             .await?;
         let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
 
-        progress.done_snapshots = pos as u64 + 1;
-        info!("percentage done: {progress}");
+        {
+            let mut progress = progress.lock().unwrap();
+            progress.done_snapshots = pos as u64 + 1;
+            info!("percentage done: {progress}");
+        }
 
         let stats = result?; // stop on error
         pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
     Ok(pull_stats)
 }
 
+fn pull_group_task<'future>(
+    params: &'future PullParameters,
+    group: &'future BackupGroup,
+    namespace: &'future BackupNamespace,
+    target_namespace: &'future BackupNamespace,
+    progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+    Box::pin(async move {
+        let progress = Arc::new(Mutex::new(progress));
+        let mut pull_stats = PullStats::default();
+        let mut errors = false;
+
+        let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+            target_namespace,
+            group,
+            &params.owner,
+        ) {
+            Ok(result) => result,
+            Err(err) => {
+                info!("sync group {group} failed - group lock failed: {err}");
+                errors = true;
+                // do not stop here, instead continue
+                info!("create_locked_backup_group failed");
+                return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+            }
+        };
+
+        // permission check
+        if params.owner != owner {
+            // only the owner is allowed to create additional snapshots
+            info!(
+                "sync group {group} failed - owner check failed ({} != {owner})",
+                params.owner,
+            );
+            errors = true; // do not stop here, instead continue
+        } else {
+            match pull_group(params, namespace, group, progress.clone()).await {
+                Ok(stats) => pull_stats.add(stats),
+                Err(err) => {
+                    info!("sync group {group} failed - {err}");
+                    errors = true; // do not bail here, instead continue
+                }
+            }
+        }
+
+        let progress = progress.lock().unwrap().clone();
+        Ok((progress, pull_stats, errors))
+    })
+}
+
 /// Pulls a namespace according to `params`.
 ///
 /// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
         progress.group_snapshots = 0;
-
-        let (owner, _lock_guard) =
-            match params
-                .target
-                .store
-                .create_locked_backup_group(&target_ns, &group, &params.owner)
-            {
-                Ok(result) => result,
-                Err(err) => {
-                    info!("sync group {} failed - group lock failed: {err}", &group);
-                    errors = true;
-                    // do not stop here, instead continue
-                    info!("create_locked_backup_group failed");
-                    continue;
-                }
-            };
-
-        // permission check
-        if params.owner != owner {
-            // only the owner is allowed to create additional snapshots
-            info!(
-                "sync group {} failed - owner check failed ({} != {owner})",
-                &group, params.owner
-            );
-            errors = true; // do not stop here, instead continue
-        } else {
-            match pull_group(params, namespace, &group, &mut progress).await {
-                Ok(stats) => pull_stats.add(stats),
-                Err(err) => {
-                    info!("sync group {} failed - {err}", &group);
-                    errors = true; // do not stop here, instead continue
-                }
-            }
-        }
+        pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
     }
 
     if params.remove_vanished {
-- 
2.39.2



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-07-25 10:20 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` Christian Ebner [this message]
2024-07-30 15:56   ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Gabriel Goller
2024-07-31  7:38     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54   ` Gabriel Goller
2024-07-31  7:35     ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner
2025-01-20 10:57 ` [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240725101922.231053-3-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal