* [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount
@ 2024-12-11 10:40 Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn Hannes Laimer
` (7 more replies)
0 siblings, 8 replies; 13+ messages in thread
From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw)
To: pbs-devel
Sync jobs now have a run-on-mount flag, that, if set, runs the job whenever
a relevant removable datastore is mounted. More details on when a job is
triggered in the commit message of patch #3
proxmox-rest-server has to be bumped since patch #3
relies on the new function added in patch #1
* proxmox
Hannes Laimer (1):
rest-server: add function that returns a join handle for spawn
proxmox-rest-server/src/worker_task.rs | 21 +++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
* proxmox-backup
Hannes Laimer (6):
api types: add run-on-mount flag to SyncJobConfig
api: admin: run configured sync jobs when a datastore is mounted
api: admin: trigger sync jobs only on datastore mount
bin: manager: run uuid_mount/mount tasks on the proxy
ui: add run-on-mount checkbox to SyncJob form
ui: add task title for triggering sync jobs
pbs-api-types/src/jobs.rs | 3 +
src/api2/admin/datastore.rs | 96 +++++++++++++++++++--
src/api2/admin/sync.rs | 2 +-
src/bin/proxmox_backup_manager/datastore.rs | 42 +++++----
src/server/sync.rs | 7 +-
www/Utils.js | 1 +
www/window/SyncJobEdit.js | 23 ++++-
7 files changed, 147 insertions(+), 27 deletions(-)
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
^ permalink raw reply [flat|nested] 13+ messages in thread* [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 2/7] api types: add run-on-mount flag to SyncJobConfig Hannes Laimer ` (6 subsequent siblings) 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel We need this handle when we want to start multiple jobs sequentially from a 'manage' task. With the handle we can avoid polling for the task status with its upid. Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- proxmox-rest-server/src/worker_task.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/proxmox-rest-server/src/worker_task.rs b/proxmox-rest-server/src/worker_task.rs index beec691e..020bdb95 100644 --- a/proxmox-rest-server/src/worker_task.rs +++ b/proxmox-rest-server/src/worker_task.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::signal::unix::SignalKind; use tokio::sync::{oneshot, watch}; +use tokio::task::JoinHandle; use tracing::{info, warn}; use proxmox_daemon::command_socket::CommandSocket; @@ -942,6 +943,22 @@ impl WorkerTask { to_stdout: bool, f: F, ) -> Result<String, Error> + where + F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, + T: Send + 'static + Future<Output = Result<(), Error>>, + { + let (upid_str, _) = Self::spawn_with_handle(worker_type, worker_id, auth_id, to_stdout, f)?; + Ok(upid_str) + } + + /// Needed when a 'management' task starts multiple tasks sequentially + pub fn spawn_with_handle<F, T>( + worker_type: &str, + worker_id: Option<String>, + auth_id: String, + to_stdout: bool, + f: F, + ) -> Result<(String, JoinHandle<()>), Error> where F: Send + 'static + FnOnce(Arc<WorkerTask>) -> T, T: Send + 'static + Future<Output = Result<(), Error>>, @@ -950,12 +967,12 @@ impl WorkerTask { let upid_str = worker.upid.to_string(); let f = f(worker.clone()); - tokio::spawn(LogContext::new(logger).scope(async move { + let handle = tokio::spawn(LogContext::new(logger).scope(async move { let result = f.await; worker.log_result(&result); })); - Ok(upid_str) + Ok((upid_str, handle)) } /// Create a new worker thread. -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 2/7] api types: add run-on-mount flag to SyncJobConfig 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer ` (5 subsequent siblings) 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- pbs-api-types/src/jobs.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs index 04631d920..da2565e07 100644 --- a/pbs-api-types/src/jobs.rs +++ b/pbs-api-types/src/jobs.rs @@ -622,6 +622,9 @@ pub struct SyncJobConfig { pub resync_corrupt: Option<bool>, #[serde(skip_serializing_if = "Option::is_none")] pub sync_direction: Option<SyncDirection>, + /// Run job when datastore is mounted + #[serde(default)] + pub run_on_mount: bool, } impl SyncJobConfig { -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 2/7] api types: add run-on-mount flag to SyncJobConfig Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 4/7] api: admin: trigger sync jobs only on datastore mount Hannes Laimer ` (4 subsequent siblings) 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel When a datastore is mounted, spawn a new task to run all sync jobs marked with `run-on-mount`. These jobs run sequentially and include any job for which the mounted datastore is: - The source or target in a local push/pull job - The source in a push job to a remote datastore - The target in a pull job from a remote datastore Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- src/api2/admin/sync.rs | 2 +- src/server/sync.rs | 7 +-- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 11d2641b9..5e0e99f0b 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -41,8 +41,8 @@ use pbs_api_types::{ DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode, MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState, - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { Ok(()) } +async fn do_sync_jobs( + jobs_to_run: Vec<SyncJobConfig>, + worker: Arc<WorkerTask>, +) -> Result<(), Error> { + let count = jobs_to_run.len(); + info!( + "will run {} sync jobs: {}", + count, + jobs_to_run + .iter() + .map(|j| j.id.clone()) + .collect::<Vec<String>>() + .join(", ") + ); + + for (i, job_config) in jobs_to_run.into_iter().enumerate() { + if worker.abort_requested() { + bail!("aborted due to user request"); + } + let job_id = job_config.id.clone(); + let Ok(job) = Job::new("syncjob", &job_id) else { + continue; + }; + let auth_id = Authid::root_auth_id().clone(); + info!("[{}/{count}] starting '{job_id}'...", i + 1); + match crate::server::do_sync_job( + job, + job_config, + &auth_id, + Some("mount".to_string()), + false, + ) { + Ok((_upid, handle)) => { + tokio::select! { + sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); }, + _abort = worker.abort_future() => bail!("aborted due to user request"), + }; + } + Err(err) => warn!("unable to start sync job {job_id} - {err}"), + } + } + + Ok(()) +} + #[api( protected: true, input: { @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid = WorkerTask::new_thread( + let upid = WorkerTask::spawn( "mount-device", - Some(store), + Some(store.clone()), auth_id.to_string(), to_stdout, - move |_worker| do_mount_device(datastore), + move |_worker| async move { + do_mount_device(datastore.clone())?; + let Ok((sync_config, _digest)) = pbs_config::sync::config() else { + warn!("unable to read sync job config, won't run any sync jobs"); + return Ok(()); + }; + let Ok(list) = sync_config.convert_to_typed_array("sync") else { + warn!("unable to parse sync job config, won't run any sync jobs"); + return Ok(()); + }; + let jobs_to_run: Vec<SyncJobConfig> = list + .into_iter() + .filter(|job: &SyncJobConfig| { + // add job iff any of these apply + // - the jobs is local and we are source or target + // - we are the source of a push to a remote + // - we are the target of a pull from a remote + // + // `job.store == datastore.name` iff we are the target for pull from remote or we + // are the source for push to remote, therefore we don't have to check for the + // direction of the job. + job.remote.is_none() && job.remote_store == datastore.name + || job.store == datastore.name + }) + .collect(); + if !jobs_to_run.is_empty() { + let _ = WorkerTask::spawn( + "mount-sync-jobs", + Some(store), + auth_id.to_string(), + false, + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await }, + ); + } + Ok(()) + }, )?; Ok(json!(upid)) diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs index 6722ebea0..01dea5126 100644 --- a/src/api2/admin/sync.rs +++ b/src/api2/admin/sync.rs @@ -161,7 +161,7 @@ pub fn run_sync_job( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; Ok(upid_str) } diff --git a/src/server/sync.rs b/src/server/sync.rs index 0bd7a7a85..2a290a8ee 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; use futures::{future::FutureExt, select}; use http::StatusCode; use serde_json::json; +use tokio::task::JoinHandle; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; @@ -598,7 +599,7 @@ pub fn do_sync_job( auth_id: &Authid, schedule: Option<String>, to_stdout: bool, -) -> Result<String, Error> { +) -> Result<(String, JoinHandle<()>), Error> { let job_id = format!( "{}:{}:{}:{}:{}", sync_job.remote.as_deref().unwrap_or("-"), @@ -614,7 +615,7 @@ pub fn do_sync_job( bail!("can't sync to same datastore"); } - let upid_str = WorkerTask::spawn( + let (upid_str, handle) = WorkerTask::spawn_with_handle( &worker_type, Some(job_id.clone()), auth_id.to_string(), @@ -728,5 +729,5 @@ pub fn do_sync_job( }, )?; - Ok(upid_str) + Ok((upid_str, handle)) } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 4/7] api: admin: trigger sync jobs only on datastore mount 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer ` (2 preceding siblings ...) 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 5/7] bin: manager: run uuid_mount/mount tasks on the proxy Hannes Laimer ` (3 subsequent siblings) 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel Ensure sync jobs are triggered only when the datastore is actually mounted. If the datastore is already mounted, we don't fail, but sync jobs should not be re-triggered unnecessarily. This change prevents redundant sync job execution. Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- src/api2/admin/datastore.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 5e0e99f0b..7b318975d 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -2445,14 +2445,14 @@ fn setup_mounted_device(datastore: &DataStoreConfig, tmp_mount_path: &str) -> Re /// The reason for the randomized device mounting paths is to avoid two tasks trying to mount to /// the same path, this is *very* unlikely since the device is only mounted really shortly, but /// technically possible. -pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { +pub fn do_mount_device(datastore: DataStoreConfig) -> Result<bool, Error> { if let Some(uuid) = datastore.backing_device.as_ref() { if pbs_datastore::get_datastore_mount_status(&datastore) == Some(true) { info!( "device is already mounted at '{}'", datastore.absolute_path() ); - return Ok(()); + return Ok(false); } let tmp_mount_path = format!( "{}/{:x}", @@ -2497,7 +2497,7 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { datastore.name ) } - Ok(()) + Ok(true) } async fn do_sync_jobs( @@ -2582,7 +2582,9 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er auth_id.to_string(), to_stdout, move |_worker| async move { - do_mount_device(datastore.clone())?; + if !do_mount_device(datastore.clone())? { + return Ok(()); + } let Ok((sync_config, _digest)) = pbs_config::sync::config() else { warn!("unable to read sync job config, won't run any sync jobs"); return Ok(()); -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 5/7] bin: manager: run uuid_mount/mount tasks on the proxy 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer ` (3 preceding siblings ...) 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 4/7] api: admin: trigger sync jobs only on datastore mount Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 6/7] ui: add run-on-mount checkbox to SyncJob form Hannes Laimer ` (2 subsequent siblings) 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel Use the API instead of running uuid_mount/mount directly in the CLI binary. This ensures that all triggered tasks are handled by the proxy process. Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- src/bin/proxmox_backup_manager/datastore.rs | 42 +++++++++++++-------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/src/bin/proxmox_backup_manager/datastore.rs b/src/bin/proxmox_backup_manager/datastore.rs index 0918af3a4..e9fc1b93e 100644 --- a/src/bin/proxmox_backup_manager/datastore.rs +++ b/src/bin/proxmox_backup_manager/datastore.rs @@ -49,24 +49,31 @@ fn list_datastores(param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<Valu store: { schema: DATASTORE_SCHEMA, }, - digest: { + "output-format": { + schema: OUTPUT_FORMAT, optional: true, - schema: PROXMOX_CONFIG_DIGEST_SCHEMA, }, }, }, )] /// Mount a removable datastore. -async fn mount_datastore(mut param: Value, rpcenv: &mut dyn RpcEnvironment) -> Result<(), Error> { - param["node"] = "localhost".into(); +async fn mount_datastore( + store: String, + mut param: Value, + _rpcenv: &mut dyn RpcEnvironment, +) -> Result<(), Error> { + let output_format = extract_output_format(&mut param); - let info = &api2::admin::datastore::API_METHOD_MOUNT; - let result = match info.handler { - ApiHandler::Sync(handler) => (handler)(param, info, rpcenv)?, - _ => unreachable!(), - }; + let client = connect_to_localhost()?; + let result = client + .post( + format!("api2/json/admin/datastore/{}/mount", store).as_str(), + None, + ) + .await?; + + view_task_result(&client, result, &output_format).await?; - crate::wait_for_local_worker(result.as_str().unwrap()).await?; Ok(()) } @@ -252,10 +259,6 @@ async fn update_datastore(name: String, mut param: Value) -> Result<(), Error> { type: String, description: "The UUID of the device that should be mounted", }, - "output-format": { - schema: OUTPUT_FORMAT, - optional: true, - }, }, }, )] @@ -282,7 +285,16 @@ async fn uuid_mount(param: Value, _rpcenv: &mut dyn RpcEnvironment) -> Result<Va } if let Some(store) = matching_stores.first() { - api2::admin::datastore::do_mount_device(store.clone())?; + let client = connect_to_localhost()?; + let result = client + .post( + format!("api2/json/admin/datastore/{}/mount", store.name).as_str(), + None, + ) + .await?; + + view_task_result(&client, result, "json").await?; + return Ok(Value::Null); } // we don't want to fail for UUIDs that are not associated with datastores, as that produces -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 6/7] ui: add run-on-mount checkbox to SyncJob form 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer ` (4 preceding siblings ...) 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 5/7] bin: manager: run uuid_mount/mount tasks on the proxy Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 7/7] ui: add task title for triggering sync jobs Hannes Laimer 2025-04-08 11:33 ` [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Thomas Lamprecht 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- www/window/SyncJobEdit.js | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js index bcd2f2fb2..2b3660ed9 100644 --- a/www/window/SyncJobEdit.js +++ b/www/window/SyncJobEdit.js @@ -177,7 +177,7 @@ Ext.define('PBS.window.SyncJobEdit', { fieldLabel: gettext('Sync Schedule'), xtype: 'pbsCalendarEvent', name: 'schedule', - emptyText: gettext('none (disabled)'), + emptyText: gettext('none'), cbind: { deleteEmpty: '{!isCreate}', value: '{scheduleValue}', @@ -191,6 +191,27 @@ Ext.define('PBS.window.SyncJobEdit', { submitAutoScaledSizeUnit: true, // NOTE: handle deleteEmpty in onGetValues due to bandwidth field having a cbind too }, + { + fieldLabel: gettext('Run when mounted'), + xtype: 'proxmoxcheckbox', + name: 'run-on-mount', + autoEl: { + tag: 'div', + 'data-qtip': gettext('Run this job when a relevant removable datastore is mounted'), + }, + uncheckedValue: false, + value: false, + listeners: { + change: function(checkbox, runOnMount) { + let me = this; + let form = me.up('pbsSyncJobEdit'); + let scheduleField = form.down('field[name=schedule]'); + if (runOnMount) { + scheduleField.clearValue(); + } + }, + }, + }, ], column2: [ -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 7/7] ui: add task title for triggering sync jobs 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer ` (5 preceding siblings ...) 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 6/7] ui: add run-on-mount checkbox to SyncJob form Hannes Laimer @ 2024-12-11 10:40 ` Hannes Laimer 2025-04-08 11:33 ` [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Thomas Lamprecht 7 siblings, 0 replies; 13+ messages in thread From: Hannes Laimer @ 2024-12-11 10:40 UTC (permalink / raw) To: pbs-devel Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- www/Utils.js | 1 + 1 file changed, 1 insertion(+) diff --git a/www/Utils.js b/www/Utils.js index 2746ef0b5..e8f82e6da 100644 --- a/www/Utils.js +++ b/www/Utils.js @@ -422,6 +422,7 @@ Ext.define('PBS.Utils', { prunejob: (type, id) => PBS.Utils.render_prune_job_worker_id(id, gettext('Prune Job')), reader: (type, id) => PBS.Utils.render_datastore_worker_id(id, gettext('Read Objects')), 'rewind-media': [gettext('Drive'), gettext('Rewind Media')], + 'mount-sync-jobs': [gettext('Datastore'), gettext('trigger sync jobs')], sync: ['Datastore', gettext('Remote Sync')], syncjob: [gettext('Sync Job'), gettext('Remote Sync')], 'tape-backup': (type, id) => PBS.Utils.render_tape_backup_id(id, gettext('Tape Backup')), -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer ` (6 preceding siblings ...) 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 7/7] ui: add task title for triggering sync jobs Hannes Laimer @ 2025-04-08 11:33 ` Thomas Lamprecht 7 siblings, 0 replies; 13+ messages in thread From: Thomas Lamprecht @ 2025-04-08 11:33 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Hannes Laimer Just for the record, this was superseded by: https://lore.proxmox.com/pbs-devel/20250116064543.74619-1-h.laimer@proxmox.com/ _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox/proxmox-backup 0/7] trigger sync jobs on mount @ 2025-01-16 6:45 Hannes Laimer 2025-01-16 6:45 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer 0 siblings, 1 reply; 13+ messages in thread From: Hannes Laimer @ 2025-01-16 6:45 UTC (permalink / raw) To: pbs-devel Sync jobs now have a run-on-mount flag, that, if set, runs the job whenever a relevant removable datastore is mounted. * proxmox: Hannes Laimer (1): rest-server: add function that returns a join handle for spawn proxmox-rest-server/src/worker_task.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) * proxmox-backup: Hannes Laimer (6): api types: add run-on-mount flag to SyncJobConfig api: admin: run configured sync jobs when a datastore is mounted api: admin: trigger sync jobs only on datastore mount bin: manager: run uuid_mount/mount tasks on the proxy ui: add run-on-mount checkbox to SyncJob form ui: add task title for triggering sync jobs pbs-api-types/src/jobs.rs | 3 + src/api2/admin/datastore.rs | 96 +++++++++++++++++++-- src/api2/admin/sync.rs | 2 +- src/bin/proxmox_backup_manager/datastore.rs | 42 +++++---- src/server/sync.rs | 7 +- www/Utils.js | 1 + www/window/SyncJobEdit.js | 23 ++++- 7 files changed, 147 insertions(+), 27 deletions(-) -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted 2025-01-16 6:45 [pbs-devel] [PATCH proxmox/proxmox-backup " Hannes Laimer @ 2025-01-16 6:45 ` Hannes Laimer 2025-02-04 14:33 ` Christian Ebner 0 siblings, 1 reply; 13+ messages in thread From: Hannes Laimer @ 2025-01-16 6:45 UTC (permalink / raw) To: pbs-devel When a datastore is mounted, spawn a new task to run all sync jobs marked with `run-on-mount`. These jobs run sequentially and include any job for which the mounted datastore is: - The source or target in a local push/pull job - The source in a push job to a remote datastore - The target in a pull job from a remote datastore Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> --- src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- src/api2/admin/sync.rs | 2 +- src/server/sync.rs | 7 +-- 3 files changed, 90 insertions(+), 9 deletions(-) diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index f5d80d610..21b58391d 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -41,8 +41,8 @@ use pbs_api_types::{ DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus, GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode, MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState, - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { Ok(()) } +async fn do_sync_jobs( + jobs_to_run: Vec<SyncJobConfig>, + worker: Arc<WorkerTask>, +) -> Result<(), Error> { + let count = jobs_to_run.len(); + info!( + "will run {} sync jobs: {}", + count, + jobs_to_run + .iter() + .map(|j| j.id.clone()) + .collect::<Vec<String>>() + .join(", ") + ); + + for (i, job_config) in jobs_to_run.into_iter().enumerate() { + if worker.abort_requested() { + bail!("aborted due to user request"); + } + let job_id = job_config.id.clone(); + let Ok(job) = Job::new("syncjob", &job_id) else { + continue; + }; + let auth_id = Authid::root_auth_id().clone(); + info!("[{}/{count}] starting '{job_id}'...", i + 1); + match crate::server::do_sync_job( + job, + job_config, + &auth_id, + Some("mount".to_string()), + false, + ) { + Ok((_upid, handle)) => { + tokio::select! { + sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); }, + _abort = worker.abort_future() => bail!("aborted due to user request"), + }; + } + Err(err) => warn!("unable to start sync job {job_id} - {err}"), + } + } + + Ok(()) +} + #[api( protected: true, input: { @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid = WorkerTask::new_thread( + let upid = WorkerTask::spawn( "mount-device", - Some(store), + Some(store.clone()), auth_id.to_string(), to_stdout, - move |_worker| do_mount_device(datastore), + move |_worker| async move { + do_mount_device(datastore.clone())?; + let Ok((sync_config, _digest)) = pbs_config::sync::config() else { + warn!("unable to read sync job config, won't run any sync jobs"); + return Ok(()); + }; + let Ok(list) = sync_config.convert_to_typed_array("sync") else { + warn!("unable to parse sync job config, won't run any sync jobs"); + return Ok(()); + }; + let jobs_to_run: Vec<SyncJobConfig> = list + .into_iter() + .filter(|job: &SyncJobConfig| { + // add job iff any of these apply + // - the jobs is local and we are source or target + // - we are the source of a push to a remote + // - we are the target of a pull from a remote + // + // `job.store == datastore.name` iff we are the target for pull from remote or we + // are the source for push to remote, therefore we don't have to check for the + // direction of the job. + job.remote.is_none() && job.remote_store == datastore.name + || job.store == datastore.name + }) + .collect(); + if !jobs_to_run.is_empty() { + let _ = WorkerTask::spawn( + "mount-sync-jobs", + Some(store), + auth_id.to_string(), + false, + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await }, + ); + } + Ok(()) + }, )?; Ok(json!(upid)) diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs index 6722ebea0..01dea5126 100644 --- a/src/api2/admin/sync.rs +++ b/src/api2/admin/sync.rs @@ -161,7 +161,7 @@ pub fn run_sync_job( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; Ok(upid_str) } diff --git a/src/server/sync.rs b/src/server/sync.rs index 0bd7a7a85..2a290a8ee 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; use futures::{future::FutureExt, select}; use http::StatusCode; use serde_json::json; +use tokio::task::JoinHandle; use tracing::{info, warn}; use proxmox_human_byte::HumanByte; @@ -598,7 +599,7 @@ pub fn do_sync_job( auth_id: &Authid, schedule: Option<String>, to_stdout: bool, -) -> Result<String, Error> { +) -> Result<(String, JoinHandle<()>), Error> { let job_id = format!( "{}:{}:{}:{}:{}", sync_job.remote.as_deref().unwrap_or("-"), @@ -614,7 +615,7 @@ pub fn do_sync_job( bail!("can't sync to same datastore"); } - let upid_str = WorkerTask::spawn( + let (upid_str, handle) = WorkerTask::spawn_with_handle( &worker_type, Some(job_id.clone()), auth_id.to_string(), @@ -728,5 +729,5 @@ pub fn do_sync_job( }, )?; - Ok(upid_str) + Ok((upid_str, handle)) } -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted 2025-01-16 6:45 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer @ 2025-02-04 14:33 ` Christian Ebner 2025-02-28 8:18 ` Hannes Laimer 0 siblings, 1 reply; 13+ messages in thread From: Christian Ebner @ 2025-02-04 14:33 UTC (permalink / raw) To: Proxmox Backup Server development discussion, Hannes Laimer This patch needs to be rebased on current master. High level comment: Mixing the mount api handler with the execution logic for the sync jobs is not the best approach here I think. This mixes unrelated code and makes it harder to further extend this to e.g. also run verify, prune or garbage collection in the future. Just an idea: Maybe it would be possible to let the mount worker task signal the mount event via e.g. a std::sync::mpsc::channel() or the command socket (already used to request datastore cache updates on unmount) to schedule a managing task in the proxmox-backup-proxy? further comments inline. On 1/16/25 07:45, Hannes Laimer wrote: > When a datastore is mounted, spawn a new task to run all sync jobs > marked with `run-on-mount`. These jobs run sequentially and include > any job for which the mounted datastore is: > > - The source or target in a local push/pull job > - The source in a push job to a remote datastore > - The target in a pull job from a remote datastore > > Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> > --- > src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- > src/api2/admin/sync.rs | 2 +- > src/server/sync.rs | 7 +-- > 3 files changed, 90 insertions(+), 9 deletions(-) > > diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs > index f5d80d610..21b58391d 100644 > --- a/src/api2/admin/datastore.rs > +++ b/src/api2/admin/datastore.rs > @@ -41,8 +41,8 @@ use pbs_api_types::{ > DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus, > GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode, > MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState, > - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, > - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, > + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, > + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, > IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, > PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, > PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA, > @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> { > Ok(()) > } > > +async fn do_sync_jobs( nit: this helper function makes more sense to be placed into `src/api2/admin/sync.rs` I would say, as it purely handles execution of sync jobs. > + jobs_to_run: Vec<SyncJobConfig>, > + worker: Arc<WorkerTask>, > +) -> Result<(), Error> { > + let count = jobs_to_run.len(); > + info!( > + "will run {} sync jobs: {}", > + count, nit: this variable can be in-lined into the format string. > + jobs_to_run > + .iter() > + .map(|j| j.id.clone()) > + .collect::<Vec<String>>() > + .join(", ") nit: not sure if it is required to list all sync jobs a-priori, especially since that line might get rather long if many sync jobs are to be executed? Also, that would avoid the double iteration and id cloning. > + ); > + > + for (i, job_config) in jobs_to_run.into_iter().enumerate() { > + if worker.abort_requested() { > + bail!("aborted due to user request"); > + } > + let job_id = job_config.id.clone(); > + let Ok(job) = Job::new("syncjob", &job_id) else { > + continue; > + }; > + let auth_id = Authid::root_auth_id().clone(); > + info!("[{}/{count}] starting '{job_id}'...", i + 1); > + match crate::server::do_sync_job( > + job, > + job_config, > + &auth_id, > + Some("mount".to_string()), > + false, > + ) { > + Ok((_upid, handle)) => { > + tokio::select! { > + sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); }, > + _abort = worker.abort_future() => bail!("aborted due to user request"), > + }; > + } > + Err(err) => warn!("unable to start sync job {job_id} - {err}"), > + } > + } > + > + Ok(()) > +} > + > #[api( > protected: true, > input: { > @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er > let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; > let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; > > - let upid = WorkerTask::new_thread( > + let upid = WorkerTask::spawn( > "mount-device", > - Some(store), > + Some(store.clone()), > auth_id.to_string(), > to_stdout, > - move |_worker| do_mount_device(datastore), > + move |_worker| async move { > + do_mount_device(datastore.clone())?; > + let Ok((sync_config, _digest)) = pbs_config::sync::config() else { > + warn!("unable to read sync job config, won't run any sync jobs"); > + return Ok(()); > + }; > + let Ok(list) = sync_config.convert_to_typed_array("sync") else { > + warn!("unable to parse sync job config, won't run any sync jobs"); > + return Ok(()); > + }; > + let jobs_to_run: Vec<SyncJobConfig> = list > + .into_iter() > + .filter(|job: &SyncJobConfig| { > + // add job iff any of these apply > + // - the jobs is local and we are source or target > + // - we are the source of a push to a remote > + // - we are the target of a pull from a remote > + // > + // `job.store == datastore.name` iff we are the target for pull from remote or we > + // are the source for push to remote, therefore we don't have to check for the > + // direction of the job. This misses a check for the `run_on_mount` flag here I think, as it is never read in the rest of the patch series? > + job.remote.is_none() && job.remote_store == datastore.name > + || job.store == datastore.name > + }) > + .collect(); > + if !jobs_to_run.is_empty() { > + let _ = WorkerTask::spawn( > + "mount-sync-jobs", > + Some(store), > + auth_id.to_string(), > + false, > + move |worker| async move { do_sync_jobs(jobs_to_run, worker).await }, > + ); > + } > + Ok(()) > + }, > )?; > > Ok(json!(upid)) > diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs > index 6722ebea0..01dea5126 100644 > --- a/src/api2/admin/sync.rs > +++ b/src/api2/admin/sync.rs > @@ -161,7 +161,7 @@ pub fn run_sync_job( > > let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; > > - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; > + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; > > Ok(upid_str) > } > diff --git a/src/server/sync.rs b/src/server/sync.rs > index 0bd7a7a85..2a290a8ee 100644 > --- a/src/server/sync.rs > +++ b/src/server/sync.rs > @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; > use futures::{future::FutureExt, select}; > use http::StatusCode; > use serde_json::json; > +use tokio::task::JoinHandle; > use tracing::{info, warn}; > > use proxmox_human_byte::HumanByte; > @@ -598,7 +599,7 @@ pub fn do_sync_job( > auth_id: &Authid, > schedule: Option<String>, > to_stdout: bool, > -) -> Result<String, Error> { > +) -> Result<(String, JoinHandle<()>), Error> { > let job_id = format!( > "{}:{}:{}:{}:{}", > sync_job.remote.as_deref().unwrap_or("-"), > @@ -614,7 +615,7 @@ pub fn do_sync_job( > bail!("can't sync to same datastore"); > } > > - let upid_str = WorkerTask::spawn( > + let (upid_str, handle) = WorkerTask::spawn_with_handle( > &worker_type, > Some(job_id.clone()), > auth_id.to_string(), > @@ -728,5 +729,5 @@ pub fn do_sync_job( > }, > )?; > > - Ok(upid_str) > + Ok((upid_str, handle)) > } _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted 2025-02-04 14:33 ` Christian Ebner @ 2025-02-28 8:18 ` Hannes Laimer 2025-02-28 12:13 ` Christian Ebner 0 siblings, 1 reply; 13+ messages in thread From: Hannes Laimer @ 2025-02-28 8:18 UTC (permalink / raw) To: Christian Ebner, Proxmox Backup Server development discussion On 2/4/25 15:33, Christian Ebner wrote: > This patch needs to be rebased on current master. > > High level comment: > Mixing the mount api handler with the execution logic for the sync jobs > is not the best approach here I think. This mixes unrelated code and > makes it harder to further extend this to e.g. also run verify, prune > or garbage collection in the future. > The endpoint starts the task that does the mounting, and it starts the tasks that manages possible sync jobs. I don't think this really does anything to the logic of the sync job itself. Sure, this "run these jobs in sequence" would make sense, but currently we can't do that, we don't have an interface for jobs. Not even job-ids are unique across different job types. I am not sure weather there would be a lot of value in trying to have a more abstract job things. The only thing I could think of is jobs triggering each other. But I feel like that might be out of scope for this here. > Just an idea: Maybe it would be possible to let the mount worker task > signal the mount event via e.g. a std::sync::mpsc::channel() or the > command socket (already used to request datastore cache updates on > unmount) to schedule a managing task in the proxmox-backup-proxy? > This is a privileged endpoint, it runs on the api process, so we can't really use channels here. And it don't think the command socket is the right tool for this, one reason being it is more of a hit or miss (technically) which is not really what we want here. Also having the api process handle this is IMHO not what we want. > further comments inline. > > On 1/16/25 07:45, Hannes Laimer wrote: >> When a datastore is mounted, spawn a new task to run all sync jobs >> marked with `run-on-mount`. These jobs run sequentially and include >> any job for which the mounted datastore is: >> >> - The source or target in a local push/pull job >> - The source in a push job to a remote datastore >> - The target in a pull job from a remote datastore >> >> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> >> --- >> src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- >> src/api2/admin/sync.rs | 2 +- >> src/server/sync.rs | 7 +-- >> 3 files changed, 90 insertions(+), 9 deletions(-) >> >> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs >> index f5d80d610..21b58391d 100644 >> --- a/src/api2/admin/datastore.rs >> +++ b/src/api2/admin/datastore.rs >> @@ -41,8 +41,8 @@ use pbs_api_types::{ >> DataStoreConfig, DataStoreListItem, DataStoreMountStatus, >> DataStoreStatus, >> GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, >> KeepOptions, MaintenanceMode, >> MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, >> SnapshotVerifyState, >> - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, >> BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, >> - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, >> DATASTORE_SCHEMA, >> + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, >> BACKUP_NAMESPACE_SCHEMA, >> + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, >> CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, >> IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, >> MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, >> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, >> PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, >> PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, >> UPID, UPID_SCHEMA, >> @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: >> DataStoreConfig) -> Result<(), Error> { >> Ok(()) >> } >> +async fn do_sync_jobs( > > nit: this helper function makes more sense to be placed into `src/api2/ > admin/sync.rs` I would say, as it purely handles execution of sync jobs. > >> + jobs_to_run: Vec<SyncJobConfig>, >> + worker: Arc<WorkerTask>, >> +) -> Result<(), Error> { >> + let count = jobs_to_run.len(); >> + info!( >> + "will run {} sync jobs: {}", >> + count, > > nit: this variable can be in-lined into the format string. > >> + jobs_to_run >> + .iter() >> + .map(|j| j.id.clone()) >> + .collect::<Vec<String>>() >> + .join(", ") > > nit: not sure if it is required to list all sync jobs a-priori, > especially since that line might get rather long if many sync jobs are > to be executed? Also, that would avoid the double iteration and id cloning. > >> + ); >> + >> + for (i, job_config) in jobs_to_run.into_iter().enumerate() { >> + if worker.abort_requested() { >> + bail!("aborted due to user request"); >> + } >> + let job_id = job_config.id.clone(); >> + let Ok(job) = Job::new("syncjob", &job_id) else { >> + continue; >> + }; >> + let auth_id = Authid::root_auth_id().clone(); >> + info!("[{}/{count}] starting '{job_id}'...", i + 1); >> + match crate::server::do_sync_job( >> + job, >> + job_config, >> + &auth_id, >> + Some("mount".to_string()), >> + false, >> + ) { >> + Ok((_upid, handle)) => { >> + tokio::select! { >> + sync_done = handle.fuse() => if let Err(err) = >> sync_done { warn!("could not wait for job to finish: {err}"); }, >> + _abort = worker.abort_future() => bail!("aborted >> due to user request"), >> + }; >> + } >> + Err(err) => warn!("unable to start sync job {job_id} - >> {err}"), >> + } >> + } >> + >> + Ok(()) >> +} >> + >> #[api( >> protected: true, >> input: { >> @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn >> RpcEnvironment) -> Result<Value, Er >> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; >> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; >> - let upid = WorkerTask::new_thread( >> + let upid = WorkerTask::spawn( >> "mount-device", >> - Some(store), >> + Some(store.clone()), >> auth_id.to_string(), >> to_stdout, >> - move |_worker| do_mount_device(datastore), >> + move |_worker| async move { >> + do_mount_device(datastore.clone())?; >> + let Ok((sync_config, _digest)) = >> pbs_config::sync::config() else { >> + warn!("unable to read sync job config, won't run any >> sync jobs"); >> + return Ok(()); >> + }; >> + let Ok(list) = sync_config.convert_to_typed_array("sync") >> else { >> + warn!("unable to parse sync job config, won't run any >> sync jobs"); >> + return Ok(()); >> + }; >> + let jobs_to_run: Vec<SyncJobConfig> = list >> + .into_iter() >> + .filter(|job: &SyncJobConfig| { >> + // add job iff any of these apply >> + // - the jobs is local and we are source or target >> + // - we are the source of a push to a remote >> + // - we are the target of a pull from a remote >> + // >> + // `job.store == datastore.name` iff we are the >> target for pull from remote or we >> + // are the source for push to remote, therefore >> we don't have to check for the >> + // direction of the job. > > This misses a check for the `run_on_mount` flag here I think, as it is > never read in the rest of the patch series? > I think you are right, I actually missed checking that... >> + job.remote.is_none() && job.remote_store == >> datastore.name >> + || job.store == datastore.name >> + }) >> + .collect(); >> + if !jobs_to_run.is_empty() { >> + let _ = WorkerTask::spawn( >> + "mount-sync-jobs", >> + Some(store), >> + auth_id.to_string(), >> + false, >> + move |worker| async move >> { do_sync_jobs(jobs_to_run, worker).await }, >> + ); >> + } >> + Ok(()) >> + }, >> )?; >> Ok(json!(upid)) >> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs >> index 6722ebea0..01dea5126 100644 >> --- a/src/api2/admin/sync.rs >> +++ b/src/api2/admin/sync.rs >> @@ -161,7 +161,7 @@ pub fn run_sync_job( >> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; >> - let upid_str = do_sync_job(job, sync_job, &auth_id, None, >> to_stdout)?; >> + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, >> to_stdout)?; >> Ok(upid_str) >> } >> diff --git a/src/server/sync.rs b/src/server/sync.rs >> index 0bd7a7a85..2a290a8ee 100644 >> --- a/src/server/sync.rs >> +++ b/src/server/sync.rs >> @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; >> use futures::{future::FutureExt, select}; >> use http::StatusCode; >> use serde_json::json; >> +use tokio::task::JoinHandle; >> use tracing::{info, warn}; >> use proxmox_human_byte::HumanByte; >> @@ -598,7 +599,7 @@ pub fn do_sync_job( >> auth_id: &Authid, >> schedule: Option<String>, >> to_stdout: bool, >> -) -> Result<String, Error> { >> +) -> Result<(String, JoinHandle<()>), Error> { >> let job_id = format!( >> "{}:{}:{}:{}:{}", >> sync_job.remote.as_deref().unwrap_or("-"), >> @@ -614,7 +615,7 @@ pub fn do_sync_job( >> bail!("can't sync to same datastore"); >> } >> - let upid_str = WorkerTask::spawn( >> + let (upid_str, handle) = WorkerTask::spawn_with_handle( >> &worker_type, >> Some(job_id.clone()), >> auth_id.to_string(), >> @@ -728,5 +729,5 @@ pub fn do_sync_job( >> }, >> )?; >> - Ok(upid_str) >> + Ok((upid_str, handle)) >> } > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted 2025-02-28 8:18 ` Hannes Laimer @ 2025-02-28 12:13 ` Christian Ebner 0 siblings, 0 replies; 13+ messages in thread From: Christian Ebner @ 2025-02-28 12:13 UTC (permalink / raw) To: Hannes Laimer, Proxmox Backup Server development discussion On 2/28/25 09:18, Hannes Laimer wrote: > > > On 2/4/25 15:33, Christian Ebner wrote: >> This patch needs to be rebased on current master. >> >> High level comment: >> Mixing the mount api handler with the execution logic for the sync >> jobs is not the best approach here I think. This mixes unrelated code >> and makes it harder to further extend this to e.g. also run verify, >> prune or garbage collection in the future. >> > > The endpoint starts the task that does the mounting, and it starts the > tasks that manages possible sync jobs. I don't think this really does > anything to the logic of the sync job itself. Sure, this > "run these jobs in sequence" would make sense, but currently we can't do > that, we don't have an interface for jobs. Not even job-ids are unique > across different job types. I am not sure weather there would be a lot > of value in trying to have a more abstract job things. The only thing > I could think of is jobs triggering each other. But I feel like that > might be out of scope for this here. Right, therefore having the job scheduler queuing and executing the jobs seemed better to me. But you are right, there is currently no implementation that would achieve this. > >> Just an idea: Maybe it would be possible to let the mount worker task >> signal the mount event via e.g. a std::sync::mpsc::channel() or the >> command socket (already used to request datastore cache updates on >> unmount) to schedule a managing task in the proxmox-backup-proxy? >> > > This is a privileged endpoint, it runs on the api process, so we > can't really use channels here. And it don't think the command socket > is the right tool for this, one reason being it is more of a hit or miss > (technically) which is not really what we want here. Also having the api > process handle this is IMHO not what we want. Right, fair point, was just throwing ideas at you ;). In lack of better approaches one might stick with the current implementation for executing the jobs. But I think this is rather unflexible and will be harder to extend if needed, getting messy with a job executing jobs, which themself might execute yet another job? Opinions or suggestions of other devs on this? > >> further comments inline. >> >> On 1/16/25 07:45, Hannes Laimer wrote: >>> When a datastore is mounted, spawn a new task to run all sync jobs >>> marked with `run-on-mount`. These jobs run sequentially and include >>> any job for which the mounted datastore is: >>> >>> - The source or target in a local push/pull job >>> - The source in a push job to a remote datastore >>> - The target in a pull job from a remote datastore >>> >>> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com> >>> --- >>> src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++--- >>> src/api2/admin/sync.rs | 2 +- >>> src/server/sync.rs | 7 +-- >>> 3 files changed, 90 insertions(+), 9 deletions(-) >>> >>> diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs >>> index f5d80d610..21b58391d 100644 >>> --- a/src/api2/admin/datastore.rs >>> +++ b/src/api2/admin/datastore.rs >>> @@ -41,8 +41,8 @@ use pbs_api_types::{ >>> DataStoreConfig, DataStoreListItem, DataStoreMountStatus, >>> DataStoreStatus, >>> GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, >>> KeepOptions, MaintenanceMode, >>> MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, >>> SnapshotVerifyState, >>> - BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, >>> BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA, >>> - BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, >>> DATASTORE_SCHEMA, >>> + SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, >>> BACKUP_NAMESPACE_SCHEMA, >>> + BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, >>> CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA, >>> IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, >>> MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA, >>> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, >>> PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, >>> PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, >>> UPID, UPID_SCHEMA, >>> @@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: >>> DataStoreConfig) -> Result<(), Error> { >>> Ok(()) >>> } >>> +async fn do_sync_jobs( >> >> nit: this helper function makes more sense to be placed into `src/ >> api2/ admin/sync.rs` I would say, as it purely handles execution of >> sync jobs. >> >>> + jobs_to_run: Vec<SyncJobConfig>, >>> + worker: Arc<WorkerTask>, >>> +) -> Result<(), Error> { >>> + let count = jobs_to_run.len(); >>> + info!( >>> + "will run {} sync jobs: {}", >>> + count, >> >> nit: this variable can be in-lined into the format string. >> >>> + jobs_to_run >>> + .iter() >>> + .map(|j| j.id.clone()) >>> + .collect::<Vec<String>>() >>> + .join(", ") >> >> nit: not sure if it is required to list all sync jobs a-priori, >> especially since that line might get rather long if many sync jobs are >> to be executed? Also, that would avoid the double iteration and id >> cloning. >> >>> + ); >>> + >>> + for (i, job_config) in jobs_to_run.into_iter().enumerate() { >>> + if worker.abort_requested() { >>> + bail!("aborted due to user request"); >>> + } >>> + let job_id = job_config.id.clone(); >>> + let Ok(job) = Job::new("syncjob", &job_id) else { >>> + continue; >>> + }; >>> + let auth_id = Authid::root_auth_id().clone(); >>> + info!("[{}/{count}] starting '{job_id}'...", i + 1); >>> + match crate::server::do_sync_job( >>> + job, >>> + job_config, >>> + &auth_id, >>> + Some("mount".to_string()), >>> + false, >>> + ) { >>> + Ok((_upid, handle)) => { >>> + tokio::select! { >>> + sync_done = handle.fuse() => if let Err(err) = >>> sync_done { warn!("could not wait for job to finish: {err}"); }, >>> + _abort = worker.abort_future() => bail!("aborted >>> due to user request"), >>> + }; >>> + } >>> + Err(err) => warn!("unable to start sync job {job_id} - >>> {err}"), >>> + } >>> + } >>> + >>> + Ok(()) >>> +} >>> + >>> #[api( >>> protected: true, >>> input: { >>> @@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn >>> RpcEnvironment) -> Result<Value, Er >>> let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; >>> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; >>> - let upid = WorkerTask::new_thread( >>> + let upid = WorkerTask::spawn( >>> "mount-device", >>> - Some(store), >>> + Some(store.clone()), >>> auth_id.to_string(), >>> to_stdout, >>> - move |_worker| do_mount_device(datastore), >>> + move |_worker| async move { >>> + do_mount_device(datastore.clone())?; >>> + let Ok((sync_config, _digest)) = >>> pbs_config::sync::config() else { >>> + warn!("unable to read sync job config, won't run any >>> sync jobs"); >>> + return Ok(()); >>> + }; >>> + let Ok(list) = >>> sync_config.convert_to_typed_array("sync") else { >>> + warn!("unable to parse sync job config, won't run >>> any sync jobs"); >>> + return Ok(()); >>> + }; >>> + let jobs_to_run: Vec<SyncJobConfig> = list >>> + .into_iter() >>> + .filter(|job: &SyncJobConfig| { >>> + // add job iff any of these apply >>> + // - the jobs is local and we are source or >>> target >>> + // - we are the source of a push to a remote >>> + // - we are the target of a pull from a remote >>> + // >>> + // `job.store == datastore.name` iff we are the >>> target for pull from remote or we >>> + // are the source for push to remote, therefore >>> we don't have to check for the >>> + // direction of the job. >> >> This misses a check for the `run_on_mount` flag here I think, as it is >> never read in the rest of the patch series? >> > > I think you are right, I actually missed checking that... > >>> + job.remote.is_none() && job.remote_store == >>> datastore.name >>> + || job.store == datastore.name >>> + }) >>> + .collect(); >>> + if !jobs_to_run.is_empty() { >>> + let _ = WorkerTask::spawn( >>> + "mount-sync-jobs", >>> + Some(store), >>> + auth_id.to_string(), >>> + false, >>> + move |worker| async move >>> { do_sync_jobs(jobs_to_run, worker).await }, >>> + ); >>> + } >>> + Ok(()) >>> + }, >>> )?; >>> Ok(json!(upid)) >>> diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs >>> index 6722ebea0..01dea5126 100644 >>> --- a/src/api2/admin/sync.rs >>> +++ b/src/api2/admin/sync.rs >>> @@ -161,7 +161,7 @@ pub fn run_sync_job( >>> let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; >>> - let upid_str = do_sync_job(job, sync_job, &auth_id, None, >>> to_stdout)?; >>> + let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, >>> to_stdout)?; >>> Ok(upid_str) >>> } >>> diff --git a/src/server/sync.rs b/src/server/sync.rs >>> index 0bd7a7a85..2a290a8ee 100644 >>> --- a/src/server/sync.rs >>> +++ b/src/server/sync.rs >>> @@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error}; >>> use futures::{future::FutureExt, select}; >>> use http::StatusCode; >>> use serde_json::json; >>> +use tokio::task::JoinHandle; >>> use tracing::{info, warn}; >>> use proxmox_human_byte::HumanByte; >>> @@ -598,7 +599,7 @@ pub fn do_sync_job( >>> auth_id: &Authid, >>> schedule: Option<String>, >>> to_stdout: bool, >>> -) -> Result<String, Error> { >>> +) -> Result<(String, JoinHandle<()>), Error> { >>> let job_id = format!( >>> "{}:{}:{}:{}:{}", >>> sync_job.remote.as_deref().unwrap_or("-"), >>> @@ -614,7 +615,7 @@ pub fn do_sync_job( >>> bail!("can't sync to same datastore"); >>> } >>> - let upid_str = WorkerTask::spawn( >>> + let (upid_str, handle) = WorkerTask::spawn_with_handle( >>> &worker_type, >>> Some(job_id.clone()), >>> auth_id.to_string(), >>> @@ -728,5 +729,5 @@ pub fn do_sync_job( >>> }, >>> )?; >>> - Ok(upid_str) >>> + Ok((upid_str, handle)) >>> } >> > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel ^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2025-04-08 11:34 UTC | newest] Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 2/7] api types: add run-on-mount flag to SyncJobConfig Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 4/7] api: admin: trigger sync jobs only on datastore mount Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 5/7] bin: manager: run uuid_mount/mount tasks on the proxy Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 6/7] ui: add run-on-mount checkbox to SyncJob form Hannes Laimer 2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 7/7] ui: add task title for triggering sync jobs Hannes Laimer 2025-04-08 11:33 ` [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Thomas Lamprecht 2025-01-16 6:45 [pbs-devel] [PATCH proxmox/proxmox-backup " Hannes Laimer 2025-01-16 6:45 ` [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted Hannes Laimer 2025-02-04 14:33 ` Christian Ebner 2025-02-28 8:18 ` Hannes Laimer 2025-02-28 12:13 ` Christian Ebner
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.