From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id DD5741FF165 for ; Thu, 17 Jul 2025 15:27:18 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 1C31D3A780; Thu, 17 Jul 2025 15:28:26 +0200 (CEST) Message-ID: <89919332-9ffa-4c28-99e2-18a0b7c84951@proxmox.com> Date: Thu, 17 Jul 2025 15:27:52 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird To: Proxmox Backup Server development discussion , Hannes Laimer References: <20250716145216.175184-1-h.laimer@proxmox.com> <20250716145216.175184-4-h.laimer@proxmox.com> Content-Language: en-US, de-DE From: Christian Ebner In-Reply-To: <20250716145216.175184-4-h.laimer@proxmox.com> X-Bm-Milter-Handled: 55990f41-d878-4baa-be0a-ee34c49e34d2 X-Bm-Transport-Timestamp: 1752758871236 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.046 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [datastore.rs, datastore.name, job.store] Subject: Re: [pbs-devel] [PATCH proxmox-backup v4 2/7] api: admin: run configured sync jobs when a datastore is mounted X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Transfer-Encoding: 7bit Content-Type: text/plain; charset="us-ascii"; Format="flowed" Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" On 7/16/25 4:52 PM, Hannes Laimer wrote: > When a datastore is mounted, spawn a new task to run all sync jobs > marked with `run-on-mount`. These jobs run sequentially and include > any job for which the mounted datastore is: > > - The source or target in a local push/pull job just noticed this now, but there are no local push jobs, so this should state pull only. > - The source in a push job to a remote datastore > - The target in a pull job from a remote datastore > > Signed-off-by: Hannes Laimer > --- > new in v4: > - warp do_mount_device in .spawn_blocking(|| ...) > > src/api2/admin/datastore.rs | 115 ++++++++++++++++++++++++++++++++++-- > 1 file changed, 109 insertions(+), 6 deletions(-) > > diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs > index e24bc1c1..753772f9 100644 > --- a/src/api2/admin/datastore.rs > +++ b/src/api2/admin/datastore.rs > @@ -44,8 +44,8 @@ use pbs_api_types::{ > DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus, > GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode, > MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState, > - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, > - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, > + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, > + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, > IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, > PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, > PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, > @@ -68,7 +68,7 @@ use pbs_datastore::{ > DataStore, LocalChunkReader, StoreProgress, > }; > use pbs_tools::json::required_string_param; > -use proxmox_rest_server::{formatter, WorkerTask}; > +use proxmox_rest_server::{formatter, worker_is_active, WorkerTask}; > > use crate::api2::backup::optional_ns_param; > use crate::api2::node::rrd::create_value_from_rrd; > @@ -2495,6 +2495,63 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { > Ok(()) > } > > +async fn do_sync_jobs( > + jobs_to_run: Vec, > + worker: Arc, > +) -> Result<(), Error> { > + let count = jobs_to_run.len(); > + info!( > + "will run {count} sync jobs: {}", > + jobs_to_run > + .iter() > + .map(|job| job.id.as_str()) > + .collect::>() > + .join(", ") > + ); > + > + let client = crate::client_helpers::connect_to_localhost() > + .with_context(|| format!("Failed to connect to localhost for starting sync jobs"))?; nit: this contains a 'static &str only, so no need for the string instantiation via format! and the with_context. Should be a `.context("Failed to connect to localhost for starting sync jobs")?;` > + for (i, job_config) in jobs_to_run.into_iter().enumerate() { > + if worker.abort_requested() { > + bail!("aborted due to user request"); > + } > + let job_id = &job_config.id; > + let Ok(result) = client > + .post(format!("api2/json/admin/sync/{job_id}/run").as_str(), None) > + .await > + else { > + warn!("unable to start sync job {job_id}"); nit: this could include the actual error message with context for ease of debugging! > + continue; > + }; > + info!("[{}/{count}] starting '{job_id}'...", i + 1); nit: moving above log output to right before the api call to start the job makes more sense I think. so all in all: ``` let job_id = &job_config.id; info!("[{}/{count}] starting '{job_id}'...", i + 1); let result = match client .post(format!("api2/json/admin/sync/{job_id}/run").as_str(), None) .await { Ok(result) => result, Err(err) => { warn!("unable to start sync job {job_id}: {err:#}"); continue; } }; let Some(upid_str) = result["data"].as_str() else { warn!( "could not receive UPID of started job (may be running, just can't track it here)" ); continue; }; ``` > + let Some(upid_str) = result["data"].as_str() else { > + warn!( > + "could not receive UPID of started job (may be running, just can't track it here)" > + ); > + continue; > + }; > + let upid: UPID = upid_str.parse()?; > + > + let sleep_duration = core::time::Duration::from_secs(1); > + let mut status_retries = 1; > + loop { > + if worker.abort_requested() { > + bail!("aborted due to user request, already started job will finish"); > + } > + match worker_is_active(&upid).await { > + Ok(true) => tokio::time::sleep(sleep_duration).await, > + Ok(false) => break, > + Err(_) if status_retries > 3 => break, > + Err(err) => { > + warn!("could not get job status: {err} ({status_retries}/3)"); > + status_retries += 1; > + } > + } > + } > + } > + Ok(()) > +} > + > #[api( > protected: true, > input: { > @@ -2526,12 +2583,58 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; > let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; > > - let upid = WorkerTask::new_thread( > + let upid = WorkerTask::spawn( > "mount-device", > - Some(store), > + Some(store.clone()), > auth_id.to_string(), > to_stdout, > - move |_worker| do_mount_device(datastore), > + move |_worker| async move { > + let name = datastore.name.clone(); > + let log_context = LogContext::current(); > + tokio::task::spawn_blocking(|| { > + if let Some(log_context) = log_context { > + log_context.sync_scope(|| do_mount_device(datastore)) > + } else { > + do_mount_device(datastore) > + } > + }) > + .await??; > + let Ok((sync_config, _digest)) = pbs_config::sync::config() else { > + warn!("unable to read sync job config, won't run any sync jobs"); > + return Ok(()); > + }; > + let Ok(list) = sync_config.convert_to_typed_array("sync") else { > + warn!("unable to parse sync job config, won't run any sync jobs"); > + return Ok(()); > + }; > + let mut jobs_to_run: Vec = list > + .into_iter() > + .filter(|job: &SyncJobConfig| { > + // add job iff (running on mount is enabled and) any of these apply > + // - the jobs is local and we are source or target > + // - we are the source of a push to a remote > + // - we are the target of a pull from a remote > + // > + // `job.store == datastore.name` iff we are the target for pull from remote or we > + // are the source for push to remote, therefore we don't have to check for the > + // direction of the job. > + job.run_on_mount.unwrap_or(false) > + && (job.remote.is_none() && job.remote_store == name || job.store == name) > + }) > + .collect(); > + jobs_to_run.sort_by(|j1, j2| j1.id.cmp(&j2.id)); > + if !jobs_to_run.is_empty() { > + info!("starting {} sync jobs", jobs_to_run.len()); > + let _ = WorkerTask::spawn( > + "mount-sync-jobs", > + Some(store), > + auth_id.to_string(), > + false, > + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await }, > + ); > + } > + Ok(()) > + }, > )?; > > Ok(json!(upid)) _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel