From: Christian Ebner <c.ebner@proxmox.com>
To: pbs-devel@lists.proxmox.com
Subject: [pbs-devel] [PATCH v6 proxmox-backup 16/29] api: sync: move sync job invocation to server sync module
Date: Thu, 31 Oct 2024 13:15:06 +0100 [thread overview]
Message-ID: <20241031121519.434337-17-c.ebner@proxmox.com> (raw)
In-Reply-To: <20241031121519.434337-1-c.ebner@proxmox.com>
Moves and refactores the sync_job_do function into the common server
sync module so that it can be reused for both sync directions, pull
and push.
Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
---
changes since version 5:
- Adapt to PushParameters requiring api call to fetch remote version on
instantiation
src/api2/admin/sync.rs | 19 ++--
src/api2/pull.rs | 108 -----------------------
src/bin/proxmox-backup-proxy.rs | 15 +++-
src/server/mod.rs | 1 +
src/server/sync.rs | 150 +++++++++++++++++++++++++++++++-
5 files changed, 173 insertions(+), 120 deletions(-)
diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs
index 4e2ba0be8..be324564c 100644
--- a/src/api2/admin/sync.rs
+++ b/src/api2/admin/sync.rs
@@ -10,16 +10,16 @@ use proxmox_router::{
use proxmox_schema::api;
use proxmox_sortable_macro::sortable;
-use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA};
+use pbs_api_types::{
+ Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA,
+};
use pbs_config::sync;
use pbs_config::CachedUserInfo;
use crate::{
- api2::{
- config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
- pull::do_sync_job,
- },
+ api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access},
server::jobstate::{compute_schedule_status, Job, JobState},
+ server::sync::do_sync_job,
};
#[api(
@@ -116,7 +116,14 @@ pub fn run_sync_job(
let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI;
- let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?;
+ let upid_str = do_sync_job(
+ job,
+ sync_job,
+ &auth_id,
+ None,
+ SyncDirection::Pull,
+ to_stdout,
+ )?;
Ok(upid_str)
}
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index e733c9839..d039dab59 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -13,10 +13,8 @@ use pbs_api_types::{
TRANSFER_LAST_SCHEMA,
};
use pbs_config::CachedUserInfo;
-use proxmox_human_byte::HumanByte;
use proxmox_rest_server::WorkerTask;
-use crate::server::jobstate::Job;
use crate::server::pull::{pull_store, PullParameters};
pub fn check_pull_privs(
@@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
}
}
-pub fn do_sync_job(
- mut job: Job,
- sync_job: SyncJobConfig,
- auth_id: &Authid,
- schedule: Option<String>,
- to_stdout: bool,
-) -> Result<String, Error> {
- let job_id = format!(
- "{}:{}:{}:{}:{}",
- sync_job.remote.as_deref().unwrap_or("-"),
- sync_job.remote_store,
- sync_job.store,
- sync_job.ns.clone().unwrap_or_default(),
- job.jobname()
- );
- let worker_type = job.jobtype().to_string();
-
- if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
- bail!("can't sync to same datastore");
- }
-
- let upid_str = WorkerTask::spawn(
- &worker_type,
- Some(job_id.clone()),
- auth_id.to_string(),
- to_stdout,
- move |worker| async move {
- job.start(&worker.upid().to_string())?;
-
- let worker2 = worker.clone();
- let sync_job2 = sync_job.clone();
-
- let worker_future = async move {
- let pull_params = PullParameters::try_from(&sync_job)?;
-
- info!("Starting datastore sync job '{job_id}'");
- if let Some(event_str) = schedule {
- info!("task triggered by schedule '{event_str}'");
- }
-
- info!(
- "sync datastore '{}' from '{}{}'",
- sync_job.store,
- sync_job
- .remote
- .as_deref()
- .map_or(String::new(), |remote| format!("{remote}/")),
- sync_job.remote_store,
- );
-
- let pull_stats = pull_store(pull_params).await?;
-
- if pull_stats.bytes != 0 {
- let amount = HumanByte::from(pull_stats.bytes);
- let rate = HumanByte::new_binary(
- pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(),
- );
- info!(
- "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)",
- pull_stats.chunk_count,
- );
- } else {
- info!("Summary: sync job found no new data to pull");
- }
-
- if let Some(removed) = pull_stats.removed {
- info!(
- "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
- removed.snapshots, removed.groups, removed.namespaces,
- );
- }
-
- info!("sync job '{}' end", &job_id);
-
- Ok(())
- };
-
- let mut abort_future = worker2
- .abort_future()
- .map(|_| Err(format_err!("sync aborted")));
-
- let result = select! {
- worker = worker_future.fuse() => worker,
- abort = abort_future => abort,
- };
-
- let status = worker2.create_state(&result);
-
- match job.finish(status) {
- Ok(_) => {}
- Err(err) => {
- eprintln!("could not finish job state: {}", err);
- }
- }
-
- if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
- eprintln!("send sync notification failed: {err}");
- }
-
- result
- },
- )?;
-
- Ok(upid_str)
-}
-
#[api(
input: {
properties: {
diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs
index 859f5b0f8..6f19a3fbd 100644
--- a/src/bin/proxmox-backup-proxy.rs
+++ b/src/bin/proxmox-backup-proxy.rs
@@ -40,17 +40,17 @@ use pbs_buildcfg::configdir;
use proxmox_time::CalendarEvent;
use pbs_api_types::{
- Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig,
- VerificationJobConfig,
+ Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig,
+ TapeBackupJobConfig, VerificationJobConfig,
};
use proxmox_backup::auth_helpers::*;
use proxmox_backup::server::{self, metric_collection};
use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME;
-use proxmox_backup::api2::pull::do_sync_job;
use proxmox_backup::api2::tape::backup::do_tape_backup_job;
use proxmox_backup::server::do_prune_job;
+use proxmox_backup::server::do_sync_job;
use proxmox_backup::server::do_verification_job;
fn main() -> Result<(), Error> {
@@ -611,7 +611,14 @@ async fn schedule_datastore_sync_jobs() {
};
let auth_id = Authid::root_auth_id().clone();
- if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) {
+ if let Err(err) = do_sync_job(
+ job,
+ job_config,
+ &auth_id,
+ Some(event_str),
+ SyncDirection::Pull,
+ false,
+ ) {
eprintln!("unable to start datastore sync job {job_id} - {err}");
}
};
diff --git a/src/server/mod.rs b/src/server/mod.rs
index 7c14ed4b8..b9398d21f 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -38,6 +38,7 @@ pub mod metric_collection;
pub(crate) mod pull;
pub(crate) mod push;
pub(crate) mod sync;
+pub use sync::do_sync_job;
pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> {
let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?;
diff --git a/src/server/sync.rs b/src/server/sync.rs
index bd68dda46..92a3c0933 100644
--- a/src/server/sync.rs
+++ b/src/server/sync.rs
@@ -6,16 +6,19 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
-use anyhow::{bail, format_err, Error};
+use anyhow::{bail, format_err, Context, Error};
+use futures::{future::FutureExt, select};
use http::StatusCode;
use serde_json::json;
use tracing::{info, warn};
+use proxmox_human_byte::HumanByte;
+use proxmox_rest_server::WorkerTask;
use proxmox_router::HttpError;
use pbs_api_types::{
Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem,
- MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
+ SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ,
};
use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
use pbs_datastore::data_blob::DataBlob;
@@ -24,6 +27,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk;
use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader};
use crate::backup::ListAccessibleBackupGroups;
+use crate::server::jobstate::Job;
+use crate::server::pull::{pull_store, PullParameters};
+use crate::server::push::{push_store, PushParameters};
#[derive(Default)]
pub(crate) struct RemovedVanishedStats {
@@ -568,3 +574,143 @@ pub(crate) fn check_namespace_depth_limit(
}
Ok(())
}
+
+/// Run a sync job in given direction
+pub fn do_sync_job(
+ mut job: Job,
+ sync_job: SyncJobConfig,
+ auth_id: &Authid,
+ schedule: Option<String>,
+ sync_direction: SyncDirection,
+ to_stdout: bool,
+) -> Result<String, Error> {
+ let job_id = format!(
+ "{}:{}:{}:{}:{}",
+ sync_job.remote.as_deref().unwrap_or("-"),
+ sync_job.remote_store,
+ sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ job.jobname(),
+ );
+ let worker_type = job.jobtype().to_string();
+
+ if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+ bail!("can't sync to same datastore");
+ }
+
+ let upid_str = WorkerTask::spawn(
+ &worker_type,
+ Some(job_id.clone()),
+ auth_id.to_string(),
+ to_stdout,
+ move |worker| async move {
+ job.start(&worker.upid().to_string())?;
+
+ let worker2 = worker.clone();
+ let sync_job2 = sync_job.clone();
+
+ let worker_future = async move {
+ info!("Starting datastore sync job '{job_id}'");
+ if let Some(event_str) = schedule {
+ info!("task triggered by schedule '{event_str}'");
+ }
+ let sync_stats = match sync_direction {
+ SyncDirection::Pull => {
+ info!(
+ "sync datastore '{}' from '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+ let pull_params = PullParameters::try_from(&sync_job)?;
+ pull_store(pull_params).await?
+ }
+ SyncDirection::Push => {
+ info!(
+ "sync datastore '{}' to '{}{}'",
+ sync_job.store,
+ sync_job
+ .remote
+ .as_deref()
+ .map_or(String::new(), |remote| format!("{remote}/")),
+ sync_job.remote_store,
+ );
+ let push_params = PushParameters::new(
+ &sync_job.store,
+ sync_job.ns.clone().unwrap_or_default(),
+ sync_job
+ .remote
+ .as_deref()
+ .context("missing required remote")?,
+ &sync_job.remote_store,
+ sync_job.remote_ns.clone().unwrap_or_default(),
+ sync_job
+ .owner
+ .as_ref()
+ .unwrap_or_else(|| Authid::root_auth_id())
+ .clone(),
+ sync_job.remove_vanished,
+ sync_job.max_depth,
+ sync_job.group_filter.clone(),
+ sync_job.limit.clone(),
+ sync_job.transfer_last,
+ )
+ .await?;
+ push_store(push_params).await?
+ }
+ };
+
+ if sync_stats.bytes != 0 {
+ let amount = HumanByte::from(sync_stats.bytes);
+ let rate = HumanByte::new_binary(
+ sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(),
+ );
+ info!(
+ "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)",
+ sync_stats.chunk_count,
+ );
+ } else {
+ info!("Summary: sync job found no new data to {sync_direction}");
+ }
+
+ if let Some(removed) = sync_stats.removed {
+ info!(
+ "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}",
+ removed.snapshots, removed.groups, removed.namespaces,
+ );
+ }
+
+ info!("sync job '{job_id}' end");
+
+ Ok(())
+ };
+
+ let mut abort_future = worker2
+ .abort_future()
+ .map(|_| Err(format_err!("sync aborted")));
+
+ let result = select! {
+ worker = worker_future.fuse() => worker,
+ abort = abort_future => abort,
+ };
+
+ let status = worker2.create_state(&result);
+
+ match job.finish(status) {
+ Ok(_) => {}
+ Err(err) => eprintln!("could not finish job state: {err}"),
+ }
+
+ if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) {
+ eprintln!("send sync notification failed: {err}");
+ }
+
+ result
+ },
+ )?;
+
+ Ok(upid_str)
+}
--
2.39.5
_______________________________________________
pbs-devel mailing list
pbs-devel@lists.proxmox.com
https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
next prev parent reply other threads:[~2024-10-31 12:17 UTC|newest]
Thread overview: 51+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-10-31 12:14 [pbs-devel] [PATCH v6 proxmox-backup 00/29] fix #3044: push datastore to remote target Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 01/29] client: backup writer: refactor backup and upload stats counters Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 02/29] client: backup writer: factor out merged chunk stream upload Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 03/29] client: backup writer: allow push uploading index and chunks Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 04/29] config: acl: refactor acl path component check for datastore Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 05/29] config: acl: allow namespace components for remote datastores Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 06/29] api types: add remote acl path method for `BackupNamespace` Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 07/29] api types: implement remote acl path method for sync job Christian Ebner
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 08/29] api types: define remote permissions and roles for push sync Christian Ebner
2024-11-06 11:58 ` Fabian Grünbichler
2024-10-31 12:14 ` [pbs-devel] [PATCH v6 proxmox-backup 09/29] datastore: move `BackupGroupDeleteStats` to api types Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 10/29] api types: implement api type for `BackupGroupDeleteStats` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 11/29] datastore: increment deleted group counter when removing group Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 12/29] api/api-types: refactor api endpoint version, add api types Christian Ebner
2024-11-06 11:57 ` Fabian Grünbichler
2024-11-20 16:27 ` Thomas Lamprecht
2024-11-20 17:34 ` Christian Ebner
2024-11-21 9:23 ` Thomas Lamprecht
2024-11-21 9:38 ` Fabian Grünbichler
2024-11-21 9:58 ` Christian Ebner
2024-11-21 16:01 ` Thomas Lamprecht
2024-11-21 16:15 ` Christian Ebner
2024-11-22 12:42 ` Thomas Lamprecht
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 13/29] fix #3044: server: implement push support for sync operations Christian Ebner
2024-11-06 11:57 ` Fabian Grünbichler
2024-11-07 9:27 ` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 14/29] api types/config: add `sync-push` config type for push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 15/29] api: push: implement endpoint for sync in push direction Christian Ebner
2024-11-06 15:10 ` Fabian Grünbichler
2024-11-07 9:18 ` Christian Ebner
2024-10-31 12:15 ` Christian Ebner [this message]
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 17/29] api: config: Require PRIV_DATASTORE_AUDIT to modify sync job Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 18/29] api: config: factor out sync job owner check Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 19/29] api: sync jobs: expose optional `sync-direction` parameter Christian Ebner
2024-11-06 15:20 ` Fabian Grünbichler
2024-11-07 9:10 ` Christian Ebner
2024-11-07 9:40 ` Fabian Grünbichler
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 20/29] api: admin: avoid duplicate name for list sync jobs api method Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 21/29] bin: manager: add datastore push cli command Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 22/29] ui: group filter: allow to set namespace for local datastore Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 23/29] ui: sync edit: source group filters based on sync direction Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 24/29] ui: add view with separate grids for pull and push sync jobs Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 25/29] ui: sync job: adapt edit window to be used for pull and push Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 26/29] ui: sync view: set proxy on view instead of model Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 27/29] api: datastore/namespace: return backup groups delete stats on remove Christian Ebner
2024-11-21 9:27 ` Thomas Lamprecht
2024-11-21 10:00 ` Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 28/29] api: version: add 'prune-delete-stats' as supported feature Christian Ebner
2024-10-31 12:15 ` [pbs-devel] [PATCH v6 proxmox-backup 29/29] docs: add section for sync jobs in push direction Christian Ebner
2024-11-21 16:05 ` Maximiliano Sandoval
2024-11-11 15:46 ` [pbs-devel] [PATCH v6 proxmox-backup 00/29] fix #3044: push datastore to remote target Christian Ebner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20241031121519.434337-17-c.ebner@proxmox.com \
--to=c.ebner@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.