From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper
Date: Thu, 25 Jul 2024 12:19:20 +0200 [thread overview]
Message-ID: <20240725101922.231053-3-c.ebner@proxmox.com> (raw)
In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com>
Make the error handling and accounting logic for each group pull task
reusable by moving it into its own helper function, returning the
future.
The store progress is placed behind a reference counted mutex to
allow for concurrent access of status updates.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
pbs-datastore/src/store_progress.rs | 2 +-
src/server/pull.rs | 102 +++++++++++++++++-----------
2 files changed, 65 insertions(+), 39 deletions(-)
diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs
index a32bb9a9d..8afa60ace 100644
--- a/pbs-datastore/src/store_progress.rs
+++ b/pbs-datastore/src/store_progress.rs
@@ -1,4 +1,4 @@
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
/// Tracker for progress of operations iterating over `Datastore` contents.
pub struct StoreProgress {
/// Completed groups
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 80443132e..e2d155c78 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,10 @@
//! Sync datastore by pulling contents from remote server
use std::collections::{HashMap, HashSet};
+use std::future::Future;
use std::io::{Seek, Write};
use std::path::{Path, PathBuf};
+use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
@@ -1023,7 +1025,7 @@ async fn pull_group(
params: &PullParameters,
source_namespace: &BackupNamespace,
group: &BackupGroup,
- progress: &mut StoreProgress,
+ progress: Arc<Mutex<StoreProgress>>,
) -> Result<PullStats, Error> {
let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
@@ -1079,7 +1081,10 @@ async fn pull_group(
// start with 65536 chunks (up to 256 GiB)
let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
- progress.group_snapshots = list.len() as u64;
+ {
+ let mut progress = progress.lock().unwrap();
+ progress.group_snapshots = list.len() as u64;
+ }
let mut pull_stats = PullStats::default();
@@ -1095,8 +1100,11 @@ async fn pull_group(
.await?;
let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await;
- progress.done_snapshots = pos as u64 + 1;
- info!("percentage done: {progress}");
+ {
+ let mut progress = progress.lock().unwrap();
+ progress.done_snapshots = pos as u64 + 1;
+ info!("percentage done: {progress}");
+ }
let stats = result?; // stop on error
pull_stats.add(stats);
@@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result<PullStats,
Ok(pull_stats)
}
+fn pull_group_task<'future>(
+ params: &'future PullParameters,
+ group: &'future BackupGroup,
+ namespace: &'future BackupNamespace,
+ target_namespace: &'future BackupNamespace,
+ progress: StoreProgress,
+) -> Pin<Box<dyn Future<Output = Result<(StoreProgress, PullStats, bool), Error>> + Send + 'future>>
+{
+ Box::pin(async move {
+ let progress = Arc::new(Mutex::new(progress));
+ let mut pull_stats = PullStats::default();
+ let mut errors = false;
+
+ let (owner, _lock_guard) = match params.target.store.create_locked_backup_group(
+ target_namespace,
+ group,
+ ¶ms.owner,
+ ) {
+ Ok(result) => result,
+ Err(err) => {
+ info!("sync group {group} failed - group lock failed: {err}");
+ errors = true;
+ // do not stop here, instead continue
+ info!("create_locked_backup_group failed");
+ return Ok((progress.lock().unwrap().clone(), pull_stats, errors));
+ }
+ };
+
+ // permission check
+ if params.owner != owner {
+ // only the owner is allowed to create additional snapshots
+ info!(
+ "sync group {group} failed - owner check failed ({} != {owner})",
+ params.owner,
+ );
+ errors = true; // do not stop here, instead continue
+ } else {
+ match pull_group(params, namespace, group, progress.clone()).await {
+ Ok(stats) => pull_stats.add(stats),
+ Err(err) => {
+ info!("sync group {group} failed - {err}");
+ errors = true; // do not bail here, instead continue
+ }
+ }
+ }
+
+ let progress = progress.lock().unwrap().clone();
+ Ok((progress, pull_stats, errors))
+ })
+}
+
/// Pulls a namespace according to `params`.
///
/// Pulling a namespace consists of the following steps:
@@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns(
progress.done_groups = done as u64;
progress.done_snapshots = 0;
progress.group_snapshots = 0;
-
- let (owner, _lock_guard) =
- match params
- .target
- .store
- .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
- {
- Ok(result) => result,
- Err(err) => {
- info!("sync group {} failed - group lock failed: {err}", &group);
- errors = true;
- // do not stop here, instead continue
- info!("create_locked_backup_group failed");
- continue;
- }
- };
-
- // permission check
- if params.owner != owner {
- // only the owner is allowed to create additional snapshots
- info!(
- "sync group {} failed - owner check failed ({} != {owner})",
- &group, params.owner
- );
- errors = true; // do not stop here, instead continue
- } else {
- match pull_group(params, namespace, &group, &mut progress).await {
- Ok(stats) => pull_stats.add(stats),
- Err(err) => {
- info!("sync group {} failed - {err}", &group);
- errors = true; // do not stop here, instead continue
- }
- }
- }
+ pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?;
}
if params.remove_vanished {
--
2.39.2
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-07-25 10:20 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-07-25 10:19 [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 1/4] api: config/sync: add optional group-sync-tasks property Christian Ebner
2024-07-25 10:19 ` Christian Ebner [this message]
2024-07-30 15:56 ` [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper Gabriel Goller
2024-07-31 7:38 ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 3/4] fix #4182: server: sync: allow pulling groups concurrently Christian Ebner
2024-07-30 15:54 ` Gabriel Goller
2024-07-31 7:35 ` Christian Ebner
2024-07-25 10:19 ` [pbs-devel] [RFC proxmox-backup 4/4] server: pull: conditionally buffer parallel tasks log output Christian Ebner
2025-01-20 10:57 ` [pbs-devel] [RFC proxmox-backup 0/4] concurrent group pull support for sync jobs Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20240725101922.231053-3-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox