From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) by lore.proxmox.com (Postfix) with ESMTPS id 741581FF173 for ; Mon, 11 Nov 2024 16:44:50 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 9FA1510261; Mon, 11 Nov 2024 16:44:45 +0100 (CET) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Mon, 11 Nov 2024 16:43:40 +0100 Message-Id: <20241111154353.482734-19-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.5 In-Reply-To: <20241111154353.482734-1-c.ebner@proxmox.com> References: <20241111154353.482734-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.032 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH v7 proxmox-backup 18/31] api: sync: move sync job invocation to server sync module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Moves and refactores the sync_job_do function into the common server sync module so that it can be reused for both sync directions, pull and push. Signed-off-by: Christian Ebner --- changes since version 6: - no changes src/api2/admin/sync.rs | 19 ++-- src/api2/pull.rs | 108 ----------------------- src/bin/proxmox-backup-proxy.rs | 15 +++- src/server/mod.rs | 1 + src/server/sync.rs | 150 +++++++++++++++++++++++++++++++- 5 files changed, 173 insertions(+), 120 deletions(-) diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs index 4e2ba0be8..be324564c 100644 --- a/src/api2/admin/sync.rs +++ b/src/api2/admin/sync.rs @@ -10,16 +10,16 @@ use proxmox_router::{ use proxmox_schema::api; use proxmox_sortable_macro::sortable; -use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA}; +use pbs_api_types::{ + Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA, +}; use pbs_config::sync; use pbs_config::CachedUserInfo; use crate::{ - api2::{ - config::sync::{check_sync_job_modify_access, check_sync_job_read_access}, - pull::do_sync_job, - }, + api2::config::sync::{check_sync_job_modify_access, check_sync_job_read_access}, server::jobstate::{compute_schedule_status, Job, JobState}, + server::sync::do_sync_job, }; #[api( @@ -116,7 +116,14 @@ pub fn run_sync_job( let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; + let upid_str = do_sync_job( + job, + sync_job, + &auth_id, + None, + SyncDirection::Pull, + to_stdout, + )?; Ok(upid_str) } diff --git a/src/api2/pull.rs b/src/api2/pull.rs index e733c9839..d039dab59 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -13,10 +13,8 @@ use pbs_api_types::{ TRANSFER_LAST_SCHEMA, }; use pbs_config::CachedUserInfo; -use proxmox_human_byte::HumanByte; use proxmox_rest_server::WorkerTask; -use crate::server::jobstate::Job; use crate::server::pull::{pull_store, PullParameters}; pub fn check_pull_privs( @@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters { } } -pub fn do_sync_job( - mut job: Job, - sync_job: SyncJobConfig, - auth_id: &Authid, - schedule: Option, - to_stdout: bool, -) -> Result { - let job_id = format!( - "{}:{}:{}:{}:{}", - sync_job.remote.as_deref().unwrap_or("-"), - sync_job.remote_store, - sync_job.store, - sync_job.ns.clone().unwrap_or_default(), - job.jobname() - ); - let worker_type = job.jobtype().to_string(); - - if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { - bail!("can't sync to same datastore"); - } - - let upid_str = WorkerTask::spawn( - &worker_type, - Some(job_id.clone()), - auth_id.to_string(), - to_stdout, - move |worker| async move { - job.start(&worker.upid().to_string())?; - - let worker2 = worker.clone(); - let sync_job2 = sync_job.clone(); - - let worker_future = async move { - let pull_params = PullParameters::try_from(&sync_job)?; - - info!("Starting datastore sync job '{job_id}'"); - if let Some(event_str) = schedule { - info!("task triggered by schedule '{event_str}'"); - } - - info!( - "sync datastore '{}' from '{}{}'", - sync_job.store, - sync_job - .remote - .as_deref() - .map_or(String::new(), |remote| format!("{remote}/")), - sync_job.remote_store, - ); - - let pull_stats = pull_store(pull_params).await?; - - if pull_stats.bytes != 0 { - let amount = HumanByte::from(pull_stats.bytes); - let rate = HumanByte::new_binary( - pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(), - ); - info!( - "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)", - pull_stats.chunk_count, - ); - } else { - info!("Summary: sync job found no new data to pull"); - } - - if let Some(removed) = pull_stats.removed { - info!( - "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", - removed.snapshots, removed.groups, removed.namespaces, - ); - } - - info!("sync job '{}' end", &job_id); - - Ok(()) - }; - - let mut abort_future = worker2 - .abort_future() - .map(|_| Err(format_err!("sync aborted"))); - - let result = select! { - worker = worker_future.fuse() => worker, - abort = abort_future => abort, - }; - - let status = worker2.create_state(&result); - - match job.finish(status) { - Ok(_) => {} - Err(err) => { - eprintln!("could not finish job state: {}", err); - } - } - - if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { - eprintln!("send sync notification failed: {err}"); - } - - result - }, - )?; - - Ok(upid_str) -} - #[api( input: { properties: { diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 859f5b0f8..6f19a3fbd 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -40,17 +40,17 @@ use pbs_buildcfg::configdir; use proxmox_time::CalendarEvent; use pbs_api_types::{ - Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig, - VerificationJobConfig, + Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig, + TapeBackupJobConfig, VerificationJobConfig, }; use proxmox_backup::auth_helpers::*; use proxmox_backup::server::{self, metric_collection}; use proxmox_backup::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME; -use proxmox_backup::api2::pull::do_sync_job; use proxmox_backup::api2::tape::backup::do_tape_backup_job; use proxmox_backup::server::do_prune_job; +use proxmox_backup::server::do_sync_job; use proxmox_backup::server::do_verification_job; fn main() -> Result<(), Error> { @@ -611,7 +611,14 @@ async fn schedule_datastore_sync_jobs() { }; let auth_id = Authid::root_auth_id().clone(); - if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) { + if let Err(err) = do_sync_job( + job, + job_config, + &auth_id, + Some(event_str), + SyncDirection::Pull, + false, + ) { eprintln!("unable to start datastore sync job {job_id} - {err}"); } }; diff --git a/src/server/mod.rs b/src/server/mod.rs index 7c14ed4b8..b9398d21f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -38,6 +38,7 @@ pub mod metric_collection; pub(crate) mod pull; pub(crate) mod push; pub(crate) mod sync; +pub use sync::do_sync_job; pub(crate) async fn reload_proxy_certificate() -> Result<(), Error> { let proxy_pid = proxmox_rest_server::read_pid(pbs_buildcfg::PROXMOX_BACKUP_PROXY_PID_FN)?; diff --git a/src/server/sync.rs b/src/server/sync.rs index 19b244f5a..4ce0777bf 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -6,16 +6,19 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::{bail, format_err, Error}; +use anyhow::{bail, format_err, Context, Error}; +use futures::{future::FutureExt, select}; use http::StatusCode; use serde_json::json; use tracing::{info, warn}; +use proxmox_human_byte::HumanByte; +use proxmox_rest_server::WorkerTask; use proxmox_router::HttpError; use pbs_api_types::{ Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem, - MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, + SyncDirection, SyncJobConfig, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, }; use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; use pbs_config::CachedUserInfo; @@ -25,6 +28,9 @@ use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader}; use crate::backup::ListAccessibleBackupGroups; +use crate::server::jobstate::Job; +use crate::server::pull::{pull_store, PullParameters}; +use crate::server::push::{push_store, PushParameters}; #[derive(Default)] pub(crate) struct RemovedVanishedStats { @@ -593,3 +599,143 @@ pub(crate) fn check_namespace_depth_limit( } Ok(()) } + +/// Run a sync job in given direction +pub fn do_sync_job( + mut job: Job, + sync_job: SyncJobConfig, + auth_id: &Authid, + schedule: Option, + sync_direction: SyncDirection, + to_stdout: bool, +) -> Result { + let job_id = format!( + "{}:{}:{}:{}:{}", + sync_job.remote.as_deref().unwrap_or("-"), + sync_job.remote_store, + sync_job.store, + sync_job.ns.clone().unwrap_or_default(), + job.jobname(), + ); + let worker_type = job.jobtype().to_string(); + + if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { + bail!("can't sync to same datastore"); + } + + let upid_str = WorkerTask::spawn( + &worker_type, + Some(job_id.clone()), + auth_id.to_string(), + to_stdout, + move |worker| async move { + job.start(&worker.upid().to_string())?; + + let worker2 = worker.clone(); + let sync_job2 = sync_job.clone(); + + let worker_future = async move { + info!("Starting datastore sync job '{job_id}'"); + if let Some(event_str) = schedule { + info!("task triggered by schedule '{event_str}'"); + } + let sync_stats = match sync_direction { + SyncDirection::Pull => { + info!( + "sync datastore '{}' from '{}{}'", + sync_job.store, + sync_job + .remote + .as_deref() + .map_or(String::new(), |remote| format!("{remote}/")), + sync_job.remote_store, + ); + let pull_params = PullParameters::try_from(&sync_job)?; + pull_store(pull_params).await? + } + SyncDirection::Push => { + info!( + "sync datastore '{}' to '{}{}'", + sync_job.store, + sync_job + .remote + .as_deref() + .map_or(String::new(), |remote| format!("{remote}/")), + sync_job.remote_store, + ); + let push_params = PushParameters::new( + &sync_job.store, + sync_job.ns.clone().unwrap_or_default(), + sync_job + .remote + .as_deref() + .context("missing required remote")?, + &sync_job.remote_store, + sync_job.remote_ns.clone().unwrap_or_default(), + sync_job + .owner + .as_ref() + .unwrap_or_else(|| Authid::root_auth_id()) + .clone(), + sync_job.remove_vanished, + sync_job.max_depth, + sync_job.group_filter.clone(), + sync_job.limit.clone(), + sync_job.transfer_last, + ) + .await?; + push_store(push_params).await? + } + }; + + if sync_stats.bytes != 0 { + let amount = HumanByte::from(sync_stats.bytes); + let rate = HumanByte::new_binary( + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(), + ); + info!( + "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)", + sync_stats.chunk_count, + ); + } else { + info!("Summary: sync job found no new data to {sync_direction}"); + } + + if let Some(removed) = sync_stats.removed { + info!( + "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", + removed.snapshots, removed.groups, removed.namespaces, + ); + } + + info!("sync job '{job_id}' end"); + + Ok(()) + }; + + let mut abort_future = worker2 + .abort_future() + .map(|_| Err(format_err!("sync aborted"))); + + let result = select! { + worker = worker_future.fuse() => worker, + abort = abort_future => abort, + }; + + let status = worker2.create_state(&result); + + match job.finish(status) { + Ok(_) => {} + Err(err) => eprintln!("could not finish job state: {err}"), + } + + if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { + eprintln!("send sync notification failed: {err}"); + } + + result + }, + )?; + + Ok(upid_str) +} -- 2.39.5 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel