public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Hannes Laimer <h.laimer@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH proxmox-backup 3/7] api: admin: run configured sync jobs when a datastore is mounted
Date: Wed, 11 Dec 2024 11:40:46 +0100	[thread overview]
Message-ID: <20241211104050.69441-4-h.laimer@proxmox.com> (raw)
In-Reply-To: <20241211104050.69441-1-h.laimer@proxmox.com>

When a datastore is mounted, spawn a new task to run all sync jobs
marked with `run-on-mount`. These jobs run sequentially and include
any job for which the mounted datastore is:

- The source or target in a local push/pull job
- The source in a push job to a remote datastore
- The target in a pull job from a remote datastore

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/api2/admin/datastore.rs | 90 ++++++++++++++++++++++++++++++++++---
 src/api2/admin/sync.rs      |  2 +-
 src/server/sync.rs          |  7 +--
 3 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs
index 11d2641b9..5e0e99f0b 100644
--- a/src/api2/admin/datastore.rs
+++ b/src/api2/admin/datastore.rs
@@ -41,8 +41,8 @@ use pbs_api_types::{
     DataStoreConfig, DataStoreListItem, DataStoreMountStatus, DataStoreStatus,
     GarbageCollectionJobStatus, GroupListItem, JobScheduleStatus, KeepOptions, MaintenanceMode,
     MaintenanceType, Operation, PruneJobOptions, SnapshotListItem, SnapshotVerifyState,
-    BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA, BACKUP_TIME_SCHEMA,
-    BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
+    SyncJobConfig, BACKUP_ARCHIVE_NAME_SCHEMA, BACKUP_ID_SCHEMA, BACKUP_NAMESPACE_SCHEMA,
+    BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA, CATALOG_NAME, CLIENT_LOG_BLOB_NAME, DATASTORE_SCHEMA,
     IGNORE_VERIFIED_BACKUPS_SCHEMA, MANIFEST_BLOB_NAME, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_SCHEMA,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE,
     PRIV_DATASTORE_READ, PRIV_DATASTORE_VERIFY, PRIV_SYS_MODIFY, UPID, UPID_SCHEMA,
@@ -2500,6 +2500,51 @@ pub fn do_mount_device(datastore: DataStoreConfig) -> Result<(), Error> {
     Ok(())
 }
 
+async fn do_sync_jobs(
+    jobs_to_run: Vec<SyncJobConfig>,
+    worker: Arc<WorkerTask>,
+) -> Result<(), Error> {
+    let count = jobs_to_run.len();
+    info!(
+        "will run {} sync jobs: {}",
+        count,
+        jobs_to_run
+            .iter()
+            .map(|j| j.id.clone())
+            .collect::<Vec<String>>()
+            .join(", ")
+    );
+
+    for (i, job_config) in jobs_to_run.into_iter().enumerate() {
+        if worker.abort_requested() {
+            bail!("aborted due to user request");
+        }
+        let job_id = job_config.id.clone();
+        let Ok(job) = Job::new("syncjob", &job_id) else {
+            continue;
+        };
+        let auth_id = Authid::root_auth_id().clone();
+        info!("[{}/{count}] starting '{job_id}'...", i + 1);
+        match crate::server::do_sync_job(
+            job,
+            job_config,
+            &auth_id,
+            Some("mount".to_string()),
+            false,
+        ) {
+            Ok((_upid, handle)) => {
+                tokio::select! {
+                    sync_done = handle.fuse() => if let Err(err) = sync_done { warn!("could not wait for job to finish: {err}"); },
+                    _abort = worker.abort_future() => bail!("aborted due to user request"),
+                };
+            }
+            Err(err) => warn!("unable to start sync job {job_id} - {err}"),
+        }
+    }
+
+    Ok(())
+}
+
 #[api(
     protected: true,
     input: {
@@ -2531,12 +2576,47 @@ pub fn mount(store: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Value, Er
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid = WorkerTask::new_thread(
+    let upid = WorkerTask::spawn(
         "mount-device",
-        Some(store),
+        Some(store.clone()),
         auth_id.to_string(),
         to_stdout,
-        move |_worker| do_mount_device(datastore),
+        move |_worker| async move {
+            do_mount_device(datastore.clone())?;
+            let Ok((sync_config, _digest)) = pbs_config::sync::config() else {
+                warn!("unable to read sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let Ok(list) = sync_config.convert_to_typed_array("sync") else {
+                warn!("unable to parse sync job config, won't run any sync jobs");
+                return Ok(());
+            };
+            let jobs_to_run: Vec<SyncJobConfig> = list
+                .into_iter()
+                .filter(|job: &SyncJobConfig| {
+                    // add job iff any of these apply
+                    //   - the jobs is local and we are source or target
+                    //   - we are the source of a push to a remote
+                    //   - we are the target of a pull from a remote
+                    //
+                    // `job.store == datastore.name` iff we are the target for pull from remote or we
+                    // are the source for push to remote, therefore we don't have to check for the
+                    // direction of the job.
+                    job.remote.is_none() && job.remote_store == datastore.name
+                        || job.store == datastore.name
+                })
+                .collect();
+            if !jobs_to_run.is_empty() {
+                let _ = WorkerTask::spawn(
+                    "mount-sync-jobs",
+                    Some(store),
+                    auth_id.to_string(),
+                    false,
+                    move |worker| async move { do_sync_jobs(jobs_to_run, worker).await },
+                );
+            }
+            Ok(())
+        },
     )?;
 
     Ok(json!(upid))
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 6722ebea0..01dea5126 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -161,7 +161,7 @@ pub fn run_sync_job(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+    let (upid_str, _) = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
 
     Ok(upid_str)
 }
diff --git a/src/server/sync.rs b/src/server/sync.rs
index 0bd7a7a85..2a290a8ee 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -11,6 +11,7 @@ use anyhow::{bail, format_err, Context, Error};
 use futures::{future::FutureExt, select};
 use http::StatusCode;
 use serde_json::json;
+use tokio::task::JoinHandle;
 use tracing::{info, warn};
 
 use proxmox_human_byte::HumanByte;
@@ -598,7 +599,7 @@ pub fn do_sync_job(
     auth_id: &Authid,
     schedule: Option<String>,
     to_stdout: bool,
-) -> Result<String, Error> {
+) -> Result<(String, JoinHandle<()>), Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
         sync_job.remote.as_deref().unwrap_or("-"),
@@ -614,7 +615,7 @@ pub fn do_sync_job(
         bail!("can't sync to same datastore");
     }
 
-    let upid_str = WorkerTask::spawn(
+    let (upid_str, handle) = WorkerTask::spawn_with_handle(
         &worker_type,
         Some(job_id.clone()),
         auth_id.to_string(),
@@ -728,5 +729,5 @@ pub fn do_sync_job(
         },
     )?;
 
-    Ok(upid_str)
+    Ok((upid_str, handle))
 }
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-12-11 10:41 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-12-11 10:40 [pbs-devel] [PATCH proxmox-backup 0/7] trigger sync jobs on mount Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox 1/7] rest-server: add function that returns a join handle for spawn Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 2/7] api types: add run-on-mount flag to SyncJobConfig Hannes Laimer
2024-12-11 10:40 ` Hannes Laimer [this message]
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 4/7] api: admin: trigger sync jobs only on datastore mount Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 5/7] bin: manager: run uuid_mount/mount tasks on the proxy Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 6/7] ui: add run-on-mount checkbox to SyncJob form Hannes Laimer
2024-12-11 10:40 ` [pbs-devel] [PATCH proxmox-backup 7/7] ui: add task title for triggering sync jobs Hannes Laimer

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241211104050.69441-4-h.laimer@proxmox.com \
    --to=h.laimer@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal