From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id C417F1FF15C for ; Wed, 11 Dec 2024 11:41:30 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id B25B476EB; Wed, 11 Dec 2024 11:41:34 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Wed, 11 Dec 2024 11:40:46 +0100 Message-Id: <20241211104050.69441-4-h.laimer@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241211104050.69441-1-h.laimer@proxmox.com> References: <20241211104050.69441-1-h.laimer@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.028 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" When a datastore is mounted, spawn a new task to run all sync jobs marked with `run-on-mount`. These jobs run sequentially and include any job for which the mounted datastore is: - The source or target in a local push/pull job - The source in a push job to a remote datastore - The target in a pull job from a remote datastore Signed-off-by: Hannes Laimer --- src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- src/api2/admin/sync.rs | 2 +- src/server/sync.rs | 7 +-- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 11d2641b9..5e0e99f0b 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -41,8 +41,8 @@ use pbs_api_types::{ DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode, MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState, - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { Ok(()) } +async fn do_sync_jobs( + jobs_to_run: Vec, + worker: Arc, +) -> Result<(), Error> { + let count = jobs_to_run.len(); + info!( + "will run {} sync jobs: {}", + count, + jobs_to_run + .iter() + .map(|j| j.id.clone()) + .collect::>() + .join(", ") + ); + + for (i, job_config) in jobs_to_run.into_iter().enumerate() { + if worker.abort_requested() { + bail!("aborted due to user request"); + } + let job_id = job_config.id.clone(); + let Ok(job) = Job::new("syncjob", &job_id) else { + continue; + }; + let auth_id = Authid::root_auth_id().clone(); + info!("[{}/{count}] starting '{job_id}'...", i + 1); + match crate::server::do_sync_job( + job, + job_config, + &auth_id, + Some("mount".to_string()), + false, + ) { + Ok((_upid, handle)) => { + tokio::select! { + sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); }, + _abort = worker.abort_future() => bail!("aborted due to user request"), + }; + } + Err(err) => warn!("unable to start sync job {job_id} - {err}"), + } + } + + Ok(()) +} + #[api( protected: true, input: { @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result = list + .into_iter() + .filter(|job: &SyncJobConfig| { + // add job iff any of these apply + // - the jobs is local and we are source or target + // - we are the source of a push to a remote + // - we are the target of a pull from a remote + // + // `job.store == datastore.name` iff we are the target for pull from remote or we + // are the source for push to remote, therefore we don't have to check for the + // direction of the job. + job.remote.is_none() && job.remote_store == datastore.name + || job.store == datastore.name + }) + .collect(); + if !jobs_to_run.is_empty() { + let _ = WorkerTask::spawn( + "mount-sync-jobs", + Some(store), + auth_id.to_string(), + false, + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await }, + ); + } + Ok(()) + }, )?; Ok(json!(upid)) diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs index 6722ebea0..01dea5126 100644 --- a/src/api2/admin/sync.rs +++ b/src/api2/admin/sync.rs @@ -161,7 +161,7 @@ pub fn run_sync_job( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; Ok(upid_str) } diff --git a/src/server/sync.rs b/src/server/sync.rs index 0bd7a7a85..2a290a8ee 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; use futures::{future::FutureExt, select}; use http::StatusCode; use serde_json::json; +use tokio::task::JoinHandle; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; @@ -598,7 +599,7 @@ pub fn do_sync_job( auth_id: &Authid, schedule: Option, to_stdout: bool, -) -> Result { +) -> Result<(String, JoinHandle<()>), Error> { let job_id = format!( "{}:{}:{}:{}:{}", sync_job.remote.as_deref().unwrap_or("-"), @@ -614,7 +615,7 @@ pub fn do_sync_job( bail!("can't sync to same datastore"); } - let upid_str = WorkerTask::spawn( + let (upid_str, handle) = WorkerTask::spawn_with_handle( &worker_type, Some(job_id.clone()), auth_id.to_string(), @@ -728,5 +729,5 @@ pub fn do_sync_job( }, )?; - Ok(upid_str) + Ok((upid_str, handle)) } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel