From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: <pbs-devel-bounces@lists.proxmox.com> Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 2AFA71FF15C for <inbox@lore.proxmox.com>; Wed, 7 Aug 2024 12:51:31 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 3B6E44B5C; Wed, 7 Aug 2024 12:51:41 +0200 (CEST) MIME-Version: 1.0 In-Reply-To: <20240801074403.36229-18-c.ebner@proxmox.com> References: <20240801074403.36229-1-c.ebner@proxmox.com> <20240801074403.36229-18-c.ebner@proxmox.com> From: Fabian =?utf-8?q?Gr=C3=BCnbichler?= <f.gruenbichler@proxmox.com> To: Christian Ebner <c.ebner@proxmox.com>, pbs-devel@lists.proxmox.com Date: Wed, 07 Aug 2024 12:51:01 +0200 Message-ID: <172302786180.107519.7339099634957657204@yuna.proxmox.com> User-Agent: alot/0.10 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.048 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment RCVD_IN_VALIDITY_CERTIFIED_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_RPBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. RCVD_IN_VALIDITY_SAFE_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to Validity was blocked. See https://knowledge.validity.com/hc/en-us/articles/20961730681243 for more information. SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [push.rs, proxmox.com, sync.rs, pull.rs, mod.rs, proxmox-backup-proxy.rs] Subject: Re: [pbs-devel] [PATCH v2 proxmox-backup 17/31] api: sync: move sync job invocation to common module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion <pbs-devel.lists.proxmox.com> List-Unsubscribe: <https://lists.proxmox.com/cgi-bin/mailman/options/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=unsubscribe> List-Archive: <http://lists.proxmox.com/pipermail/pbs-devel/> List-Post: <mailto:pbs-devel@lists.proxmox.com> List-Help: <mailto:pbs-devel-request@lists.proxmox.com?subject=help> List-Subscribe: <https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel>, <mailto:pbs-devel-request@lists.proxmox.com?subject=subscribe> Reply-To: Proxmox Backup Server development discussion <pbs-devel@lists.proxmox.com> Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" <pbs-devel-bounces@lists.proxmox.com> Quoting Christian Ebner (2024-08-01 09:43:49) > Moves and refactored the sync_job_do function into a common sync > module so that it can be reused for both sync directions, pull and > push. > > Signed-off-by: Christian Ebner <c.ebner@proxmox.com> > --- > changes since version 1: > - adapt to new sync direction config type > - refactor log outputs to reduce code and to fix meaning for push > direction > > src/api2/admin/sync.rs | 15 +++- > src/api2/mod.rs | 1 + > src/api2/pull.rs | 108 -------------------------- > src/api2/sync.rs | 131 ++++++++++++++++++++++++++++++++ the do_sync_job is not really API code now, and could move to src/server/sync.rs? or at least the part that is executed as worker task could, with the task spawning left in src/api2/pull.rs and push.rs that's how the other jobs are handled as well, except for the tape backup jobs ;) > src/bin/proxmox-backup-proxy.rs | 15 +++- > 5 files changed, 155 insertions(+), 115 deletions(-) > create mode 100644 src/api2/sync.rs > > diff --git a/src/api2/admin/sync.rs b/src/api2/admin/sync.rs > index 4e2ba0be8..5bab1e396 100644 > --- a/src/api2/admin/sync.rs > +++ b/src/api2/admin/sync.rs > @@ -10,14 +10,16 @@ use proxmox_router::{ > use proxmox_schema::api; > use proxmox_sortable_macro::sortable; > > -use pbs_api_types::{Authid, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA}; > +use pbs_api_types::{ > + Authid, SyncDirection, SyncJobConfig, SyncJobStatus, DATASTORE_SCHEMA, JOB_ID_SCHEMA, > +}; > use pbs_config::sync; > use pbs_config::CachedUserInfo; > > use crate::{ > api2::{ > config::sync::{check_sync_job_modify_access, check_sync_job_read_access}, > - pull::do_sync_job, > + sync::do_sync_job, > }, > server::jobstate::{compute_schedule_status, Job, JobState}, > }; > @@ -116,7 +118,14 @@ pub fn run_sync_job( > > let to_stdout = rpcenv.env_type() == RpcEnvironmentType::CLI; > > - let upid_str = do_sync_job(job, sync_job, &auth_id, None, to_stdout)?; > + let upid_str = do_sync_job( > + job, > + sync_job, > + &auth_id, > + None, > + SyncDirection::Pull, > + to_stdout, > + )?; > > Ok(upid_str) > } > diff --git a/src/api2/mod.rs b/src/api2/mod.rs > index 03596326b..44e3776a4 100644 > --- a/src/api2/mod.rs > +++ b/src/api2/mod.rs > @@ -15,6 +15,7 @@ pub mod pull; > pub mod push; > pub mod reader; > pub mod status; > +pub mod sync; > pub mod tape; > pub mod types; > pub mod version; > diff --git a/src/api2/pull.rs b/src/api2/pull.rs > index e733c9839..d039dab59 100644 > --- a/src/api2/pull.rs > +++ b/src/api2/pull.rs > @@ -13,10 +13,8 @@ use pbs_api_types::{ > TRANSFER_LAST_SCHEMA, > }; > use pbs_config::CachedUserInfo; > -use proxmox_human_byte::HumanByte; > use proxmox_rest_server::WorkerTask; > > -use crate::server::jobstate::Job; > use crate::server::pull::{pull_store, PullParameters}; > > pub fn check_pull_privs( > @@ -93,112 +91,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters { > } > } > > -pub fn do_sync_job( > - mut job: Job, > - sync_job: SyncJobConfig, > - auth_id: &Authid, > - schedule: Option<String>, > - to_stdout: bool, > -) -> Result<String, Error> { > - let job_id = format!( > - "{}:{}:{}:{}:{}", > - sync_job.remote.as_deref().unwrap_or("-"), > - sync_job.remote_store, > - sync_job.store, > - sync_job.ns.clone().unwrap_or_default(), > - job.jobname() > - ); > - let worker_type = job.jobtype().to_string(); > - > - if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { > - bail!("can't sync to same datastore"); > - } > - > - let upid_str = WorkerTask::spawn( > - &worker_type, > - Some(job_id.clone()), > - auth_id.to_string(), > - to_stdout, > - move |worker| async move { > - job.start(&worker.upid().to_string())?; > - > - let worker2 = worker.clone(); > - let sync_job2 = sync_job.clone(); > - > - let worker_future = async move { > - let pull_params = PullParameters::try_from(&sync_job)?; > - > - info!("Starting datastore sync job '{job_id}'"); > - if let Some(event_str) = schedule { > - info!("task triggered by schedule '{event_str}'"); > - } > - > - info!( > - "sync datastore '{}' from '{}{}'", > - sync_job.store, > - sync_job > - .remote > - .as_deref() > - .map_or(String::new(), |remote| format!("{remote}/")), > - sync_job.remote_store, > - ); > - > - let pull_stats = pull_store(pull_params).await?; > - > - if pull_stats.bytes != 0 { > - let amount = HumanByte::from(pull_stats.bytes); > - let rate = HumanByte::new_binary( > - pull_stats.bytes as f64 / pull_stats.elapsed.as_secs_f64(), > - ); > - info!( > - "Summary: sync job pulled {amount} in {} chunks (average rate: {rate}/s)", > - pull_stats.chunk_count, > - ); > - } else { > - info!("Summary: sync job found no new data to pull"); > - } > - > - if let Some(removed) = pull_stats.removed { > - info!( > - "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", > - removed.snapshots, removed.groups, removed.namespaces, > - ); > - } > - > - info!("sync job '{}' end", &job_id); > - > - Ok(()) > - }; > - > - let mut abort_future = worker2 > - .abort_future() > - .map(|_| Err(format_err!("sync aborted"))); > - > - let result = select! { > - worker = worker_future.fuse() => worker, > - abort = abort_future => abort, > - }; > - > - let status = worker2.create_state(&result); > - > - match job.finish(status) { > - Ok(_) => {} > - Err(err) => { > - eprintln!("could not finish job state: {}", err); > - } > - } > - > - if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { > - eprintln!("send sync notification failed: {err}"); > - } > - > - result > - }, > - )?; > - > - Ok(upid_str) > -} > - > #[api( > input: { > properties: { > diff --git a/src/api2/sync.rs b/src/api2/sync.rs > new file mode 100644 > index 000000000..8de413556 > --- /dev/null > +++ b/src/api2/sync.rs > @@ -0,0 +1,131 @@ > +//! Sync datastore from remote server > +use anyhow::{bail, format_err, Error}; > +use futures::{future::FutureExt, select}; > +use tracing::info; > + > +use pbs_api_types::{Authid, SyncDirection, SyncJobConfig}; > +use proxmox_human_byte::HumanByte; > +use proxmox_rest_server::WorkerTask; > + > +use crate::server::jobstate::Job; > +use crate::server::pull::{pull_store, PullParameters}; > +use crate::server::push::{push_store, PushParameters}; > + > +pub fn do_sync_job( > + mut job: Job, > + sync_job: SyncJobConfig, > + auth_id: &Authid, > + schedule: Option<String>, > + sync_direction: SyncDirection, > + to_stdout: bool, > +) -> Result<String, Error> { > + let job_id = format!( > + "{}:{}:{}:{}:{}", > + sync_job.remote.as_deref().unwrap_or("-"), > + sync_job.remote_store, > + sync_job.store, > + sync_job.ns.clone().unwrap_or_default(), > + job.jobname(), > + ); > + let worker_type = job.jobtype().to_string(); > + > + if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store { > + bail!("can't sync to same datastore"); > + } > + > + let upid_str = WorkerTask::spawn( > + &worker_type, > + Some(job_id.clone()), > + auth_id.to_string(), > + to_stdout, > + move |worker| async move { > + job.start(&worker.upid().to_string())?; > + > + let worker2 = worker.clone(); > + let sync_job2 = sync_job.clone(); > + > + let worker_future = async move { > + info!("Starting datastore sync job '{job_id}'"); > + if let Some(event_str) = schedule { > + info!("task triggered by schedule '{event_str}'"); > + } > + let sync_stats = match sync_direction { > + SyncDirection::Pull => { > + info!( > + "sync datastore '{}' from '{}{}'", > + sync_job.store, > + sync_job > + .remote > + .as_deref() > + .map_or(String::new(), |remote| format!("{remote}/")), > + sync_job.remote_store, > + ); > + let pull_params = PullParameters::try_from(&sync_job)?; > + pull_store(pull_params).await? > + } > + SyncDirection::Push => { > + info!( > + "sync datastore '{}' to '{}{}'", > + sync_job.store, > + sync_job > + .remote > + .as_deref() > + .map_or(String::new(), |remote| format!("{remote}/")), > + sync_job.remote_store, > + ); > + let push_params = PushParameters::try_from(&sync_job)?; > + push_store(push_params).await? > + } > + }; > + > + if sync_stats.bytes != 0 { > + let amount = HumanByte::from(sync_stats.bytes); > + let rate = HumanByte::new_binary( > + sync_stats.bytes as f64 / sync_stats.elapsed.as_secs_f64(), > + ); > + info!( > + "Summary: sync job {sync_direction}ed {amount} in {} chunks (average rate: {rate}/s)", > + sync_stats.chunk_count, > + ); > + } else { > + info!("Summary: sync job found no new data to {sync_direction}"); > + } > + > + if let Some(removed) = sync_stats.removed { > + info!( > + "Summary: removed vanished: snapshots: {}, groups: {}, namespaces: {}", > + removed.snapshots, removed.groups, removed.namespaces, > + ); > + } > + > + info!("sync job '{job_id}' end"); > + > + Ok(()) > + }; > + > + let mut abort_future = worker2 > + .abort_future() > + .map(|_| Err(format_err!("sync aborted"))); > + > + let result = select! { > + worker = worker_future.fuse() => worker, > + abort = abort_future => abort, > + }; > + > + let status = worker2.create_state(&result); > + > + match job.finish(status) { > + Ok(_) => {} > + Err(err) => eprintln!("could not finish job state: {err}"), > + } > + > + if let Err(err) = crate::server::send_sync_status(&sync_job2, &result) { > + eprintln!("send sync notification failed: {err}"); > + } > + > + result > + }, > + )?; > + > + Ok(upid_str) > +} > diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs > index 041f3aff9..278adfa43 100644 > --- a/src/bin/proxmox-backup-proxy.rs > +++ b/src/bin/proxmox-backup-proxy.rs > @@ -46,8 +46,8 @@ use pbs_buildcfg::configdir; > use proxmox_time::CalendarEvent; > > use pbs_api_types::{ > - Authid, DataStoreConfig, Operation, PruneJobConfig, SyncJobConfig, TapeBackupJobConfig, > - VerificationJobConfig, > + Authid, DataStoreConfig, Operation, PruneJobConfig, SyncDirection, SyncJobConfig, > + TapeBackupJobConfig, VerificationJobConfig, > }; > > use proxmox_backup::auth_helpers::*; > @@ -57,7 +57,7 @@ use proxmox_backup::tools::{ > PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, > }; > > -use proxmox_backup::api2::pull::do_sync_job; > +use proxmox_backup::api2::sync::do_sync_job; > use proxmox_backup::api2::tape::backup::do_tape_backup_job; > use proxmox_backup::server::do_prune_job; > use proxmox_backup::server::do_verification_job; > @@ -630,7 +630,14 @@ async fn schedule_datastore_sync_jobs() { > }; > > let auth_id = Authid::root_auth_id().clone(); > - if let Err(err) = do_sync_job(job, job_config, &auth_id, Some(event_str), false) { > + if let Err(err) = do_sync_job( > + job, > + job_config, > + &auth_id, > + Some(event_str), > + SyncDirection::Pull, > + false, > + ) { > eprintln!("unable to start datastore sync job {job_id} - {err}"); > } > }; > -- > 2.39.2 > > > > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel > > _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel