public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v4 proxmox 11/31] api: sync: move sync job invocation to server sync module
Date: Thu, 17 Oct 2024 15:26:56 +0200	[thread overview]
Message-ID: <20241017132716.385234-12-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241017132716.385234-1-c.ebner@proxmox.com>

Moves and refactores the sync_job_do function into the common server
sync module so that it can be reused for both sync directions, pull
and push.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 3:
- Drop not needed `job_user`, priv checks handled by `local_user`

 src/api2/admin/sync.rs          |  19 +++--
 src/api2/pull.rs                | 108 ---------------------------
 src/bin/proxmox-backup-proxy.rs |  15 +++-
 src/server/mod.rs               |   1 +
 src/server/sync.rs              | 128 +++++++++++++++++++++++++++++++-
 5 files changed, 152 insertions(+), 119 deletions(-)

diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 4e2ba0be8..be324564c 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -10,16 +10,16 @@ use proxmox_router::{
 use proxmox_schema::api;
 use proxmox_sortable_macro::sortable;
 
-use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA};
+use pbs_api_types::{
+    Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA,
+};
 use pbs_config::sync;
 use pbs_config::CachedUserInfo;
 
 use crate::{
-    api2::{
-        config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
-        pull::do_sync_job,
-    },
+    api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
     server::jobstate::{compute_schedule_status, Job, JobState},
+    server::sync::do_sync_job,
 };
 
 #[api(
@@ -116,7 +116,14 @@ pub fn run_sync_job(
 
     let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
 
-    let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+    let upid_str = do_sync_job(
+        job,
+        sync_job,
+        &auth_id,
+        None,
+        SyncDirection::Pull,
+        to_stdout,
+    )?;
 
     Ok(upid_str)
 }
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..d039dab59 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,10 +13,8 @@ use pbs_api_types::{
     TRANSFER_LAST_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
-use proxmox_human_byte::HumanByte;
 use proxmox_rest_server::WorkerTask;
 
-use crate::server::jobstate::Job;
 use crate::server::pull::{pull_store, PullParameters};
 
 pub fn check_pull_privs(
@@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
     }
 }
 
-pub fn do_sync_job(
-    mut job: Job,
-    sync_job: SyncJobConfig,
-    auth_id: &Authid,
-    schedule: Option<String>,
-    to_stdout: bool,
-) -> Result<String, Error> {
-    let job_id = format!(
-        "{}:{}:{}:{}:{}",
-        sync_job.remote.as_deref().unwrap_or("-"),
-        sync_job.remote_store,
-        sync_job.store,
-        sync_job.ns.clone().unwrap_or_default(),
-        job.jobname()
-    );
-    let worker_type = job.jobtype().to_string();
-
-    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
-        bail!("can't sync to same datastore");
-    }
-
-    let upid_str = WorkerTask::spawn(
-        &worker_type,
-        Some(job_id.clone()),
-        auth_id.to_string(),
-        to_stdout,
-        move |worker| async move {
-            job.start(&worker.upid().to_string())?;
-
-            let worker2 = worker.clone();
-            let sync_job2 = sync_job.clone();
-
-            let worker_future = async move {
-                let pull_params = PullParameters::try_from(&sync_job)?;
-
-                info!("Starting datastore sync job '{job_id}'");
-                if let Some(event_str) = schedule {
-                    info!("task triggered by schedule '{event_str}'");
-                }
-
-                info!(
-                    "sync datastore '{}' from '{}{}'",
-                    sync_job.store,
-                    sync_job
-                        .remote
-                        .as_deref()
-                        .map_or(String::new(), |remote| format!("{remote}/")),
-                    sync_job.remote_store,
-                );
-
-                let pull_stats = pull_store(pull_params).await?;
-
-                if pull_stats.bytes != 0 {
-                    let amount = HumanByte::from(pull_stats.bytes);
-                    let rate = HumanByte::new_binary(
-                        pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
-                    );
-                    info!(
-                        "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
-                        pull_stats.chunk_count,
-                    );
-                } else {
-                    info!("Summary: sync job found no new data to pull");
-                }
-
-                if let Some(removed) = pull_stats.removed {
-                    info!(
-                        "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
-                        removed.snapshots, removed.groups, removed.namespaces,
-                    );
-                }
-
-                info!("sync job '{}' end", &job_id);
-
-                Ok(())
-            };
-
-            let mut abort_future = worker2
-                .abort_future()
-                .map(|_| Err(format_err!("sync aborted")));
-
-            let result = select! {
-                worker = worker_future.fuse() => worker,
-                abort = abort_future => abort,
-            };
-
-            let status = worker2.create_state(&result);
-
-            match job.finish(status) {
-                Ok(_) => {}
-                Err(err) => {
-                    eprintln!("could not finish job state: {}", err);
-                }
-            }
-
-            if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
-                eprintln!("send sync notification failed: {err}");
-            }
-
-            result
-        },
-    )?;
-
-    Ok(upid_str)
-}
-
 #[api(
     input: {
         properties: {
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 859f5b0f8..6f19a3fbd 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -40,17 +40,17 @@ use pbs_buildcfg::configdir;
 use proxmox_time::CalendarEvent;
 
 use pbs_api_types::{
-    Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
-    VerificationJobConfig,
+    Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig,
+    TapeBackupJobConfig, VerificationJobConfig,
 };
 
 use proxmox_backup::auth_helpers::*;
 use proxmox_backup::server::{self, metric_collection};
 use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
 
-use proxmox_backup::api2::pull::do_sync_job;
 use proxmox_backup::api2::tape::backup::do_tape_backup_job;
 use proxmox_backup::server::do_prune_job;
+use proxmox_backup::server::do_sync_job;
 use proxmox_backup::server::do_verification_job;
 
 fn main() -> Result<(), Error> {
@@ -611,7 +611,14 @@ async fn schedule_datastore_sync_jobs() {
             };
 
             let auth_id = Authid::root_auth_id().clone();
-            if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
+            if let Err(err) = do_sync_job(
+                job,
+                job_config,
+                &auth_id,
+                Some(event_str),
+                SyncDirection::Pull,
+                false,
+            ) {
                 eprintln!("unable to start datastore sync job {job_id} - {err}");
             }
         };
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7c14ed4b8..b9398d21f 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -38,6 +38,7 @@ pub mod metric_collection;
 pub(crate) mod pull;
 pub(crate) mod push;
 pub(crate) mod sync;
+pub use sync::do_sync_job;
 
 pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
     let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/sync.rs b/src/server/sync.rs
index bd68dda46..d0df7df6e 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -7,15 +7,18 @@ use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 use anyhow::{bail, format_err, Error};
+use futures::{future::FutureExt, select};
 use http::StatusCode;
 use serde_json::json;
 use tracing::{info, warn};
 
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
 
 use pbs_api_types::{
     Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
-    MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+    SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
 };
 use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
 use pbs_datastore::data_blob::DataBlob;
@@ -24,6 +27,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
 
 use crate::backup::ListAccessibleBackupGroups;
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+use crate::server::push::{push_store, PushParameters};
 
 #[derive(Default)]
 pub(crate) struct RemovedVanishedStats {
@@ -568,3 +574,123 @@ pub(crate) fn check_namespace_depth_limit(
     }
     Ok(())
 }
+
+/// Run a sync job in given direction
+pub fn do_sync_job(
+    mut job: Job,
+    sync_job: SyncJobConfig,
+    auth_id: &Authid,
+    schedule: Option<String>,
+    sync_direction: SyncDirection,
+    to_stdout: bool,
+) -> Result<String, Error> {
+    let job_id = format!(
+        "{}:{}:{}:{}:{}",
+        sync_job.remote.as_deref().unwrap_or("-"),
+        sync_job.remote_store,
+        sync_job.store,
+        sync_job.ns.clone().unwrap_or_default(),
+        job.jobname(),
+    );
+    let worker_type = job.jobtype().to_string();
+
+    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+        bail!("can't sync to same datastore");
+    }
+
+    let upid_str = WorkerTask::spawn(
+        &worker_type,
+        Some(job_id.clone()),
+        auth_id.to_string(),
+        to_stdout,
+        move |worker| async move {
+            job.start(&worker.upid().to_string())?;
+
+            let worker2 = worker.clone();
+            let sync_job2 = sync_job.clone();
+
+            let worker_future = async move {
+                info!("Starting datastore sync job '{job_id}'");
+                if let Some(event_str) = schedule {
+                    info!("task triggered by schedule '{event_str}'");
+                }
+                let sync_stats = match sync_direction {
+                    SyncDirection::Pull => {
+                        info!(
+                            "sync datastore '{}' from '{}{}'",
+                            sync_job.store,
+                            sync_job
+                                .remote
+                                .as_deref()
+                                .map_or(String::new(), |remote| format!("{remote}/")),
+                            sync_job.remote_store,
+                        );
+                        let pull_params = PullParameters::try_from(&sync_job)?;
+                        pull_store(pull_params).await?
+                    }
+                    SyncDirection::Push => {
+                        info!(
+                            "sync datastore '{}' to '{}{}'",
+                            sync_job.store,
+                            sync_job
+                                .remote
+                                .as_deref()
+                                .map_or(String::new(), |remote| format!("{remote}/")),
+                            sync_job.remote_store,
+                        );
+                        let push_params = PushParameters::try_from(&sync_job)?;
+                        push_store(push_params).await?
+                    }
+                };
+
+                if sync_stats.bytes != 0 {
+                    let amount = HumanByte::from(sync_stats.bytes);
+                    let rate = HumanByte::new_binary(
+                        sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
+                    );
+                    info!(
+                        "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)",
+                        sync_stats.chunk_count,
+                    );
+                } else {
+                    info!("Summary: sync job found no new data to {sync_direction}");
+                }
+
+                if let Some(removed) = sync_stats.removed {
+                    info!(
+                        "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+                        removed.snapshots, removed.groups, removed.namespaces,
+                    );
+                }
+
+                info!("sync job '{job_id}' end");
+
+                Ok(())
+            };
+
+            let mut abort_future = worker2
+                .abort_future()
+                .map(|_| Err(format_err!("sync aborted")));
+
+            let result = select! {
+                worker = worker_future.fuse() => worker,
+                abort = abort_future => abort,
+            };
+
+            let status = worker2.create_state(&result);
+
+            match job.finish(status) {
+                Ok(_) => {}
+                Err(err) => eprintln!("could not finish job state: {err}"),
+            }
+
+            if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
+                eprintln!("send sync notification failed: {err}");
+            }
+
+            result
+        },
+    )?;
+
+    Ok(upid_str)
+}
-- 
2.39.5



_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel


  parent reply	other threads:[~2024-10-17 13:27 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-10-17 13:26 [pbs-devel] [PATCH v4 proxmox 00/31] fix #3044: push datastore to remote target Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 01/31] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 02/31] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 03/31] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 04/31] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 05/31] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 06/31] api types: implement remote acl path method for sync job Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 07/31] api types: define remote permissions and roles for push sync Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 08/31] fix #3044: server: implement push support for sync operations Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 09/31] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 10/31] api: push: implement endpoint for sync in push direction Christian Ebner
2024-10-17 13:26 ` Christian Ebner [this message]
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 12/31] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 13/31] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-17 13:26 ` [pbs-devel] [PATCH v4 proxmox 14/31] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 15/31] api: config: factor out sync job owner check Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 16/31] api: config: extend read access check by sync direction Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 17/31] api: config: extend modify " Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 18/31] bin: manager: add datastore push cli command Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 19/31] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 20/31] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 21/31] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 22/31] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 23/31] ui: sync: pass sync-direction to allow removing push jobs Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 24/31] ui: sync view: do not use data model proxy for store Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 25/31] ui: sync view: set sync direction when invoking run task via api Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 26/31] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 27/31] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 28/31] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 29/31] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 30/31] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-10-17 13:27 ` [pbs-devel] [PATCH v4 proxmox 31/31] server: sync job: use delete stats provided by the api Christian Ebner
2024-10-18  6:55 ` [pbs-devel] [PATCH v4 proxmox 00/31] fix #3044: push datastore to remote target Christian Ebner
2024-10-18  8:44 ` Christian Ebner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20241017132716.385234-12-c.ebner@proxmox.com \
    --to=c.ebner@proxmox.com \
    --cc=pbs-devel@lists.proxmox.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal