From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 7F6791FF15D for ; Thu, 25 Jul 2024 12:20:20 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id E696A352D; Thu, 25 Jul 2024 12:20:20 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 25 Jul 2024 12:19:20 +0200 Message-Id: <20240725101922.231053-3-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240725101922.231053-1-c.ebner@proxmox.com> References: <20240725101922.231053-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 2/4] server: pull: factor out group pull task into helper X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Make the error handling and accounting logic for each group pull task reusable by moving it into its own helper function, returning the future. The store progress is placed behind a reference counted mutex to allow for concurrent access of status updates. Signed-off-by: Christian Ebner --- pbs-datastore/src/store_progress.rs | 2 +- src/server/pull.rs | 102 +++++++++++++++++----------- 2 files changed, 65 insertions(+), 39 deletions(-) diff --git a/pbs-datastore/src/store_progress.rs b/pbs-datastore/src/store_progress.rs index a32bb9a9d..8afa60ace 100644 --- a/pbs-datastore/src/store_progress.rs +++ b/pbs-datastore/src/store_progress.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] /// Tracker for progress of operations iterating over `Datastore` contents. pub struct StoreProgress { /// Completed groups diff --git a/src/server/pull.rs b/src/server/pull.rs index 80443132e..e2d155c78 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,8 +1,10 @@ //! Sync datastore by pulling contents from remote server use std::collections::{HashMap, HashSet}; +use std::future::Future; use std::io::{Seek, Write}; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -1023,7 +1025,7 @@ async fn pull_group( params: &PullParameters, source_namespace: &BackupNamespace, group: &BackupGroup, - progress: &mut StoreProgress, + progress: Arc>, ) -> Result { let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); @@ -1079,7 +1081,10 @@ async fn pull_group( // start with 65536 chunks (up to 256 GiB) let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); - progress.group_snapshots = list.len() as u64; + { + let mut progress = progress.lock().unwrap(); + progress.group_snapshots = list.len() as u64; + } let mut pull_stats = PullStats::default(); @@ -1095,8 +1100,11 @@ async fn pull_group( .await?; let result = pull_snapshot_from(reader, &to_snapshot, downloaded_chunks.clone()).await; - progress.done_snapshots = pos as u64 + 1; - info!("percentage done: {progress}"); + { + let mut progress = progress.lock().unwrap(); + progress.done_snapshots = pos as u64 + 1; + info!("percentage done: {progress}"); + } let stats = result?; // stop on error pull_stats.add(stats); @@ -1349,6 +1357,57 @@ pub(crate) async fn pull_store(mut params: PullParameters) -> Result( + params: &'future PullParameters, + group: &'future BackupGroup, + namespace: &'future BackupNamespace, + target_namespace: &'future BackupNamespace, + progress: StoreProgress, +) -> Pin> + Send + 'future>> +{ + Box::pin(async move { + let progress = Arc::new(Mutex::new(progress)); + let mut pull_stats = PullStats::default(); + let mut errors = false; + + let (owner, _lock_guard) = match params.target.store.create_locked_backup_group( + target_namespace, + group, + ¶ms.owner, + ) { + Ok(result) => result, + Err(err) => { + info!("sync group {group} failed - group lock failed: {err}"); + errors = true; + // do not stop here, instead continue + info!("create_locked_backup_group failed"); + return Ok((progress.lock().unwrap().clone(), pull_stats, errors)); + } + }; + + // permission check + if params.owner != owner { + // only the owner is allowed to create additional snapshots + info!( + "sync group {group} failed - owner check failed ({} != {owner})", + params.owner, + ); + errors = true; // do not stop here, instead continue + } else { + match pull_group(params, namespace, group, progress.clone()).await { + Ok(stats) => pull_stats.add(stats), + Err(err) => { + info!("sync group {group} failed - {err}"); + errors = true; // do not bail here, instead continue + } + } + } + + let progress = progress.lock().unwrap().clone(); + Ok((progress, pull_stats, errors)) + }) +} + /// Pulls a namespace according to `params`. /// /// Pulling a namespace consists of the following steps: @@ -1402,40 +1461,7 @@ pub(crate) async fn pull_ns( progress.done_groups = done as u64; progress.done_snapshots = 0; progress.group_snapshots = 0; - - let (owner, _lock_guard) = - match params - .target - .store - .create_locked_backup_group(&target_ns, &group, ¶ms.owner) - { - Ok(result) => result, - Err(err) => { - info!("sync group {} failed - group lock failed: {err}", &group); - errors = true; - // do not stop here, instead continue - info!("create_locked_backup_group failed"); - continue; - } - }; - - // permission check - if params.owner != owner { - // only the owner is allowed to create additional snapshots - info!( - "sync group {} failed - owner check failed ({} != {owner})", - &group, params.owner - ); - errors = true; // do not stop here, instead continue - } else { - match pull_group(params, namespace, &group, &mut progress).await { - Ok(stats) => pull_stats.add(stats), - Err(err) => { - info!("sync group {} failed - {err}", &group); - errors = true; // do not stop here, instead continue - } - } - } + pull_group_task(params, &group, namespace, &target_ns, progress.clone()).await?; } if params.remove_vanished { -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel