From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v6 proxmox-backup 16/29] api: sync: move sync job invocation to server sync module
Date: Thu, 31 Oct 2024 13:15:06 +0100 [thread overview]
Message-ID: <20241031121519.434337-17-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241031121519.434337-1-c.ebner@proxmox.com>
Moves and refactores the sync_job_do function into the common server
sync module so that it can be reused for both sync directions, pull
and push.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- Adapt to PushParameters requiring api call to fetch remote version on
instantiation
src/api2/admin/sync.rs | 19 ++--
src/api2/pull.rs | 108 -----------------------
src/bin/proxmox-backup-proxy.rs | 15 +++-
src/server/mod.rs | 1 +
src/server/sync.rs | 150 +++++++++++++++++++++++++++++++-
5 files changed, 173 insertions(+), 120 deletions(-)
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 4e2ba0be8..be324564c 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -10,16 +10,16 @@ use proxmox_router::{
use proxmox_schema::api;
use proxmox_sortable_macro::sortable;
-use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA};
+use pbs_api_types::{
+ Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA,
+};
use pbs_config::sync;
use pbs_config::CachedUserInfo;
use crate::{
- api2::{
- config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
- pull::do_sync_job,
- },
+ api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
server::jobstate::{compute_schedule_status, Job, JobState},
+ server::sync::do_sync_job,
};
#[api(
@@ -116,7 +116,14 @@ pub fn run_sync_job(
let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
- let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+ let upid_str = do_sync_job(
+ job,
+ sync_job,
+ &auth_id,
+ None,
+ SyncDirection::Pull,
+ to_stdout,
+ )?;
Ok(upid_str)
}
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..d039dab59 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,10 +13,8 @@ use pbs_api_types::{
TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
-use proxmox_human_byte::HumanByte;
use proxmox_rest_server::WorkerTask;
-use crate::server::jobstate::Job;
use crate::server::pull::{pull_store, PullParameters};
pub fn check_pull_privs(
@@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
}
}
-pub fn do_sync_job(
- mut job: Job,
- sync_job: SyncJobConfig,
- auth_id: &Authid,
- schedule: Option<String>,
- to_stdout: bool,
-) -> Result<String, Error> {
- let job_id = format!(
- "{}:{}:{}:{}:{}",
- sync_job.remote.as_deref().unwrap_or("-"),
- sync_job.remote_store,
- sync_job.store,
- sync_job.ns.clone().unwrap_or_default(),
- job.jobname()
- );
- let worker_type = job.jobtype().to_string();
-
- if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
- bail!("can't sync to same datastore");
- }
-
- let upid_str = WorkerTask::spawn(
- &worker_type,
- Some(job_id.clone()),
- auth_id.to_string(),
- to_stdout,
- move |worker| async move {
- job.start(&worker.upid().to_string())?;
-
- let worker2 = worker.clone();
- let sync_job2 = sync_job.clone();
-
- let worker_future = async move {
- let pull_params = PullParameters::try_from(&sync_job)?;
-
- info!("Starting datastore sync job '{job_id}'");
- if let Some(event_str) = schedule {
- info!("task triggered by schedule '{event_str}'");
- }
-
- info!(
- "sync datastore '{}' from '{}{}'",
- sync_job.store,
- sync_job
- .remote
- .as_deref()
- .map_or(String::new(), |remote| format!("{remote}/")),
- sync_job.remote_store,
- );
-
- let pull_stats = pull_store(pull_params).await?;
-
- if pull_stats.bytes != 0 {
- let amount = HumanByte::from(pull_stats.bytes);
- let rate = HumanByte::new_binary(
- pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
- );
- info!(
- "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
- pull_stats.chunk_count,
- );
- } else {
- info!("Summary: sync job found no new data to pull");
- }
-
- if let Some(removed) = pull_stats.removed {
- info!(
- "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
- removed.snapshots, removed.groups, removed.namespaces,
- );
- }
-
- info!("sync job '{}' end", &job_id);
-
- Ok(())
- };
-
- let mut abort_future = worker2
- .abort_future()
- .map(|_| Err(format_err!("sync aborted")));
-
- let result = select! {
- worker = worker_future.fuse() => worker,
- abort = abort_future => abort,
- };
-
- let status = worker2.create_state(&result);
-
- match job.finish(status) {
- Ok(_) => {}
- Err(err) => {
- eprintln!("could not finish job state: {}", err);
- }
- }
-
- if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
- eprintln!("send sync notification failed: {err}");
- }
-
- result
- },
- )?;
-
- Ok(upid_str)
-}
-
#[api(
input: {
properties: {
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 859f5b0f8..6f19a3fbd 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -40,17 +40,17 @@ use pbs_buildcfg::configdir;
use proxmox_time::CalendarEvent;
use pbs_api_types::{
- Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
- VerificationJobConfig,
+ Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig,
+ TapeBackupJobConfig, VerificationJobConfig,
};
use proxmox_backup::auth_helpers::*;
use proxmox_backup::server::{self, metric_collection};
use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
-use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
use proxmox_backup::server::do_prune_job;
+use proxmox_backup::server::do_sync_job;
use proxmox_backup::server::do_verification_job;
fn main() -> Result<(), Error> {
@@ -611,7 +611,14 @@ async fn schedule_datastore_sync_jobs() {
};
let auth_id = Authid::root_auth_id().clone();
- if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
+ if let Err(err) = do_sync_job(
+ job,
+ job_config,
+ &auth_id,
+ Some(event_str),
+ SyncDirection::Pull,
+ false,
+ ) {
eprintln!("unable to start datastore sync job {job_id} - {err}");
}
};
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7c14ed4b8..b9398d21f 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -38,6 +38,7 @@ pub mod metric_collection;
pub(crate) mod pull;
pub(crate) mod push;
pub(crate) mod sync;
+pub use sync::do_sync_job;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/sync.rs b/src/server/sync.rs
index bd68dda46..92a3c0933 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -6,16 +6,19 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
+use futures::{future::FutureExt, select};
use http::StatusCode;
use serde_json::json;
use tracing::{info, warn};
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
use pbs_api_types::{
Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
- MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+ SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
use pbs_datastore::data_blob::DataBlob;
@@ -24,6 +27,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
use crate::backup::ListAccessibleBackupGroups;
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+use crate::server::push::{push_store, PushParameters};
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
@@ -568,3 +574,143 @@ pub(crate) fn check_namespace_depth_limit(
}
Ok(())
}
+
+/// Run a sync job in given direction
+pub fn do_sync_job(
+ mut job: Job,
+ sync_job: SyncJobConfig,
+ auth_id: &Authid,
+ schedule: Option<String>,
+ sync_direction: SyncDirection,
+ to_stdout: bool,
+) -> Result<String, Error> {
+ let job_id = format!(
+ "{}:{}:{}:{}:{}",
+ sync_job.remote.as_deref().unwrap_or("-"),
+ sync_job.remote_store,
+ sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ job.jobname(),
+ );
+ let worker_type = job.jobtype().to_string();
+
+ if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+ bail!("can't sync to same datastore");
+ }
+
+ let upid_str = WorkerTask::spawn(
+ &worker_type,
+ Some(job_id.clone()),
+ auth_id.to_string(),
+ to_stdout,
+ move |worker| async move {
+ job.start(&worker.upid().to_string())?;
+
+ let worker2 = worker.clone();
+ let sync_job2 = sync_job.clone();
+
+ let worker_future = async move {
+ info!("Starting datastore sync job '{job_id}'");
+ if let Some(event_str) = schedule {
+ info!("task triggered by schedule '{event_str}'");
+ }
+ let sync_stats = match sync_direction {
+ SyncDirection::Pull => {
+ info!(
+ "sync datastore '{}' from '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+ let pull_params = PullParameters::try_from(&sync_job)?;
+ pull_store(pull_params).await?
+ }
+ SyncDirection::Push => {
+ info!(
+ "sync datastore '{}' to '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+ let push_params = PushParameters::new(
+ &sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ sync_job
+ .remote
+ .as_deref()
+ .context("missing required remote")?,
+ &sync_job.remote_store,
+ sync_job.remote_ns.clone().unwrap_or_default(),
+ sync_job
+ .owner
+ .as_ref()
+ .unwrap_or_else(|| Authid::root_auth_id())
+ .clone(),
+ sync_job.remove_vanished,
+ sync_job.max_depth,
+ sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
+ sync_job.transfer_last,
+ )
+ .await?;
+ push_store(push_params).await?
+ }
+ };
+
+ if sync_stats.bytes != 0 {
+ let amount = HumanByte::from(sync_stats.bytes);
+ let rate = HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
+ );
+ info!(
+ "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)",
+ sync_stats.chunk_count,
+ );
+ } else {
+ info!("Summary: sync job found no new data to {sync_direction}");
+ }
+
+ if let Some(removed) = sync_stats.removed {
+ info!(
+ "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+ removed.snapshots, removed.groups, removed.namespaces,
+ );
+ }
+
+ info!("sync job '{job_id}' end");
+
+ Ok(())
+ };
+
+ let mut abort_future = worker2
+ .abort_future()
+ .map(|_| Err(format_err!("sync aborted")));
+
+ let result = select! {
+ worker = worker_future.fuse() => worker,
+ abort = abort_future => abort,
+ };
+
+ let status = worker2.create_state(&result);
+
+ match job.finish(status) {
+ Ok(_) => {}
+ Err(err) => eprintln!("could not finish job state: {err}"),
+ }
+
+ if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
+ eprintln!("send sync notification failed: {err}");
+ }
+
+ result
+ },
+ )?;
+
+ Ok(upid_str)
+}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-31 12:17 UTC|newest]
Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-31 12:14 [pbs-devel] [PATCH v6 proxmox-backup 00/29] fix #3044: push datastore to remote target Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 01/29] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 02/29] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 03/29] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 04/29] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 05/29] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 06/29] api types: add remote acl path method for `BackupNamespace` Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 07/29] api types: implement remote acl path method for sync job Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 08/29] api types: define remote permissions and roles for push sync Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 09/29] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 10/29] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 11/29] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 12/29] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 13/29] fix #3044: server: implement push support for sync operations Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 14/29] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 15/29] api: push: implement endpoint for sync in push direction Christian Ebner
2024-10-31 12:15 ` Christian Ebner [this message]
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 17/29] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 18/29] api: config: factor out sync job owner check Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 19/29] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 20/29] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 21/29] bin: manager: add datastore push cli command Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 22/29] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 23/29] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 24/29] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 25/29] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 26/29] ui: sync view: set proxy on view instead of model Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 27/29] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 28/29] api: version: add 'prune-delete-stats' as supported feature Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 29/29] docs: add section for sync jobs in push direction Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241031121519.434337-17-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox