From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 6B0201FF163 for ; Thu, 12 Sep 2024 16:39:32 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id A07D434CE6; Thu, 12 Sep 2024 16:39:35 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Thu, 12 Sep 2024 16:33:07 +0200 Message-Id: <20240912143322.548839-19-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240912143322.548839-1-c.ebner@proxmox.com> References: <20240912143322.548839-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.022 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v3 proxmox-backup 18/33] api: sync: move sync job invocation to server sync module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Moves and refactores the sync_job_do function into the common server sync module so that it can be reused for both sync directions, pull and push. Signed-off-by: Christian Ebner --- changes since version 2: - move common code to `server` submodule instead of `api2::sync` as this is only indirectly api code. - Also pass along the job user for additional permission checks via the `PushParameters` src/api2/admin/sync.rs | 19 +++-- src/api2/pull.rs | 108 -------------------------- src/bin/proxmox-backup-proxy.rs | 15 +++- src/server/mod.rs | 1 + src/server/sync.rs | 132 +++++++++++++++++++++++++++++++- 5 files changed, 156 insertions(+), 119 deletions(-) diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs index 4e2ba0be8..be324564c 100644 --- a/src/api2/admin/sync.rs +++ b/src/api2/admin/sync.rs @@ -10,16 +10,16 @@ use proxmox_router::{ use proxmox_schema::api; use proxmox_sortable_macro::sortable; -use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA}; +use pbs_api_types::{ + Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA, +}; use pbs_config::sync; use pbs_config::CachedUserInfo; use crate::{ - api2::{ - config::sync::{check_sync_job_modify_access, check_sync_job_read_access}, - pull::do_sync_job, - }, + api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access}, server::jobstate::{compute_schedule_status, Job, JobState}, + server::sync::do_sync_job, }; #[api( @@ -116,7 +116,14 @@ pub fn run_sync_job( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; + let upid_str = do_sync_job( + job, + sync_job, + &auth_id, + None, + SyncDirection::Pull, + to_stdout, + )?; Ok(upid_str) } diff --git a/src/api2/pull.rs b/src/api2/pull.rs index e733c9839..d039dab59 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -13,10 +13,8 @@ use pbs_api_types::{ TRANSFER_LAST_SCHEMA, }; use pbs_config::CachedUserInfo; -use proxmox_human_byte::HumanByte; use proxmox_rest_server::WorkerTask; -use crate::server::jobstate::Job; use crate::server::pull::{pull_store, PullParameters}; pub fn check_pull_privs( @@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters { } } -pub fn do_sync_job( - mut job: Job, - sync_job: SyncJobConfig, - auth_id: &Authid, - schedule: Option, - to_stdout: bool, -) -> Result { - let job_id = format!( - "{}:{}:{}:{}:{}", - sync_job.remote.as_deref().unwrap_or("-"), - sync_job.remote_store, - sync_job.store, - sync_job.ns.clone().unwrap_or_default(), - job.jobname() - ); - let worker_type = job.jobtype().to_string(); - - if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { - bail!("can't sync to same datastore"); - } - - let upid_str = WorkerTask::spawn( - &worker_type, - Some(job_id.clone()), - auth_id.to_string(), - to_stdout, - move |worker| async move { - job.start(&worker.upid().to_string())?; - - let worker2 = worker.clone(); - let sync_job2 = sync_job.clone(); - - let worker_future = async move { - let pull_params = PullParameters::try_from(&sync_job)?; - - info!("Starting datastore sync job '{job_id}'"); - if let Some(event_str) = schedule { - info!("task triggered by schedule '{event_str}'"); - } - - info!( - "sync datastore '{}' from '{}{}'", - sync_job.store, - sync_job - .remote - .as_deref() - .map_or(String::new(), |remote| format!("{remote}/")), - sync_job.remote_store, - ); - - let pull_stats = pull_store(pull_params).await?; - - if pull_stats.bytes != 0 { - let amount = HumanByte::from(pull_stats.bytes); - let rate = HumanByte::new_binary( - pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(), - ); - info!( - "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)", - pull_stats.chunk_count, - ); - } else { - info!("Summary: sync job found no new data to pull"); - } - - if let Some(removed) = pull_stats.removed { - info!( - "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", - removed.snapshots, removed.groups, removed.namespaces, - ); - } - - info!("sync job '{}' end", &job_id); - - Ok(()) - }; - - let mut abort_future = worker2 - .abort_future() - .map(|_| Err(format_err!("sync aborted"))); - - let result = select! { - worker = worker_future.fuse() => worker, - abort = abort_future => abort, - }; - - let status = worker2.create_state(&result); - - match job.finish(status) { - Ok(_) => {} - Err(err) => { - eprintln!("could not finish job state: {}", err); - } - } - - if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { - eprintln!("send sync notification failed: {err}"); - } - - result - }, - )?; - - Ok(upid_str) -} - #[api( input: { properties: { diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 041f3aff9..4409234b2 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -46,8 +46,8 @@ use pbs_buildcfg::configdir; use proxmox_time::CalendarEvent; use pbs_api_types::{ - Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig, - VerificationJobConfig, + Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig, + TapeBackupJobConfig, VerificationJobConfig, }; use proxmox_backup::auth_helpers::*; @@ -57,9 +57,9 @@ use proxmox_backup::tools::{ PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, }; -use proxmox_backup::api2::pull::do_sync_job; use proxmox_backup::api2::tape::backup::do_tape_backup_job; use proxmox_backup::server::do_prune_job; +use proxmox_backup::server::do_sync_job; use proxmox_backup::server::do_verification_job; fn main() -> Result<(), Error> { @@ -630,7 +630,14 @@ async fn schedule_datastore_sync_jobs() { }; let auth_id = Authid::root_auth_id().clone(); - if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) { + if let Err(err) = do_sync_job( + job, + job_config, + &auth_id, + Some(event_str), + SyncDirection::Pull, + false, + ) { eprintln!("unable to start datastore sync job {job_id} - {err}"); } }; diff --git a/src/server/mod.rs b/src/server/mod.rs index 882c5cc10..2fd95327c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -36,6 +36,7 @@ pub mod auth; pub(crate) mod pull; pub(crate) mod push; pub(crate) mod sync; +pub use sync::do_sync_job; pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> { let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?; diff --git a/src/server/sync.rs b/src/server/sync.rs index bd68dda46..e1f1db8e0 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -7,15 +7,18 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{bail, format_err, Error}; +use futures::{future::FutureExt, select}; use http::StatusCode; use serde_json::json; use tracing::{info, warn}; +use proxmox_human_byte::HumanByte; +use proxmox_rest_server::WorkerTask; use proxmox_router::HttpError; use pbs_api_types::{ Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem, - MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, + SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, }; use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; use pbs_datastore::data_blob::DataBlob; @@ -24,6 +27,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader}; use crate::backup::ListAccessibleBackupGroups; +use crate::server::jobstate::Job; +use crate::server::pull::{pull_store, PullParameters}; +use crate::server::push::{push_store, PushParameters}; #[derive(Default)] pub(crate) struct RemovedVanishedStats { @@ -568,3 +574,127 @@ pub(crate) fn check_namespace_depth_limit( } Ok(()) } + +/// Run a sync job in given direction +pub fn do_sync_job( + mut job: Job, + sync_job: SyncJobConfig, + auth_id: &Authid, + schedule: Option, + sync_direction: SyncDirection, + to_stdout: bool, +) -> Result { + let job_id = format!( + "{}:{}:{}:{}:{}", + sync_job.remote.as_deref().unwrap_or("-"), + sync_job.remote_store, + sync_job.store, + sync_job.ns.clone().unwrap_or_default(), + job.jobname(), + ); + let worker_type = job.jobtype().to_string(); + let job_user = auth_id.clone(); + + if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { + bail!("can't sync to same datastore"); + } + + let upid_str = WorkerTask::spawn( + &worker_type, + Some(job_id.clone()), + auth_id.to_string(), + to_stdout, + move |worker| async move { + job.start(&worker.upid().to_string())?; + + let worker2 = worker.clone(); + let sync_job2 = sync_job.clone(); + + let worker_future = async move { + info!("Starting datastore sync job '{job_id}'"); + if let Some(event_str) = schedule { + info!("task triggered by schedule '{event_str}'"); + } + let sync_stats = match sync_direction { + SyncDirection::Pull => { + info!( + "sync datastore '{}' from '{}{}'", + sync_job.store, + sync_job + .remote + .as_deref() + .map_or(String::new(), |remote| format!("{remote}/")), + sync_job.remote_store, + ); + let pull_params = PullParameters::try_from(&sync_job)?; + pull_store(pull_params).await? + } + SyncDirection::Push => { + info!( + "sync datastore '{}' to '{}{}'", + sync_job.store, + sync_job + .remote + .as_deref() + .map_or(String::new(), |remote| format!("{remote}/")), + sync_job.remote_store, + ); + let mut push_params = PushParameters::try_from(&sync_job)?; + // Perform permission checks for remote operations via the user the job is + // exexuted as. Without setting the job user, permission checks will fail. + push_params.job_user = Some(job_user); + push_store(push_params).await? + } + }; + + if sync_stats.bytes != 0 { + let amount = HumanByte::from(sync_stats.bytes); + let rate = HumanByte::new_binary( + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(), + ); + info!( + "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)", + sync_stats.chunk_count, + ); + } else { + info!("Summary: sync job found no new data to {sync_direction}"); + } + + if let Some(removed) = sync_stats.removed { + info!( + "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", + removed.snapshots, removed.groups, removed.namespaces, + ); + } + + info!("sync job '{job_id}' end"); + + Ok(()) + }; + + let mut abort_future = worker2 + .abort_future() + .map(|_| Err(format_err!("sync aborted"))); + + let result = select! { + worker = worker_future.fuse() => worker, + abort = abort_future => abort, + }; + + let status = worker2.create_state(&result); + + match job.finish(status) { + Ok(_) => {} + Err(err) => eprintln!("could not finish job state: {err}"), + } + + if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { + eprintln!("send sync notification failed: {err}"); + } + + result + }, + )?; + + Ok(upid_str) +} -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel