From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [IPv6:2a01:7e0:0:424::9]) by lore.proxmox.com (Postfix) with ESMTPS id 689581FF2A0 for ; Mon, 15 Jul 2024 12:16:56 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 4D94E269; Mon, 15 Jul 2024 12:17:24 +0200 (CEST) From: Christian Ebner To: pbs-devel@lists.proxmox.com Date: Mon, 15 Jul 2024 12:15:46 +0200 Message-Id: <20240715101602.274244-9-c.ebner@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20240715101602.274244-1-c.ebner@proxmox.com> References: <20240715101602.274244-1-c.ebner@proxmox.com> MIME-Version: 1.0 X-SPAM-LEVEL: Spam detection results: 0 AWL 0.021 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [RFC proxmox-backup 08/24] server: sync: move source to common sync module X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: Proxmox Backup Server development discussion Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: pbs-devel-bounces@lists.proxmox.com Sender: "pbs-devel" Rename the `PullSource` trait to `SyncSource` and move the trait and types implementing it to the common sync module, making them reusable for both sync directions, push and pull. Signed-off-by: Christian Ebner --- src/server/pull.rs | 281 ++------------------------------------------- src/server/sync.rs | 276 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 280 insertions(+), 277 deletions(-) diff --git a/src/server/pull.rs b/src/server/pull.rs index 5efe2d5f7..c6932dcc5 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -7,18 +7,14 @@ use std::sync::{Arc, Mutex}; use std::time::SystemTime; use anyhow::{bail, format_err, Error}; -use http::StatusCode; use proxmox_human_byte::HumanByte; -use proxmox_router::HttpError; -use serde_json::json; -use tracing::{info, warn}; +use tracing::info; use pbs_api_types::{ - print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, - GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, - PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, GroupFilter, Operation, + RateLimitConfig, Remote, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, }; -use pbs_client::{BackupReader, BackupRepository, HttpClient}; +use pbs_client::BackupRepository; use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; @@ -28,13 +24,13 @@ use pbs_datastore::manifest::{ ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; use pbs_datastore::read_chunk::AsyncReadChunk; -use pbs_datastore::{check_backup_owner, DataStore, ListNamespacesRecursive, StoreProgress}; +use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; use pbs_tools::sha::sha256; use super::sync::{ - LocalSourceReader, RemoteSourceReader, RemovedVanishedStats, SyncSourceReader, SyncStats, + LocalSource, RemoteSource, RemovedVanishedStats, SyncSource, SyncSourceReader, SyncStats, }; -use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; +use crate::backup::{check_ns_modification_privs, check_ns_privs}; use crate::tools::parallel_handler::ParallelHandler; pub(crate) struct PullTarget { @@ -42,269 +38,10 @@ pub(crate) struct PullTarget { ns: BackupNamespace, } -pub(crate) struct RemoteSource { - repo: BackupRepository, - ns: BackupNamespace, - client: HttpClient, -} - -pub(crate) struct LocalSource { - store: Arc, - ns: BackupNamespace, -} - -#[async_trait::async_trait] -/// `PullSource` is a trait that provides an interface for pulling data/information from a source. -/// The trait includes methods for listing namespaces, groups, and backup directories, -/// as well as retrieving a reader for reading data from the source -trait PullSource: Send + Sync { - /// Lists namespaces from the source. - async fn list_namespaces( - &self, - max_depth: &mut Option, - ) -> Result, Error>; - - /// Lists groups within a specific namespace from the source. - async fn list_groups( - &self, - namespace: &BackupNamespace, - owner: &Authid, - ) -> Result, Error>; - - /// Lists backup directories for a specific group within a specific namespace from the source. - async fn list_backup_dirs( - &self, - namespace: &BackupNamespace, - group: &BackupGroup, - ) -> Result, Error>; - fn get_ns(&self) -> BackupNamespace; - fn get_store(&self) -> &str; - - /// Returns a reader for reading data from a specific backup directory. - async fn reader( - &self, - ns: &BackupNamespace, - dir: &BackupDir, - ) -> Result, Error>; -} - -#[async_trait::async_trait] -impl PullSource for RemoteSource { - async fn list_namespaces( - &self, - max_depth: &mut Option, - ) -> Result, Error> { - if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { - return Ok(vec![self.ns.clone()]); - } - - let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); - let mut data = json!({}); - if let Some(max_depth) = max_depth { - data["max-depth"] = json!(max_depth); - } - - if !self.ns.is_root() { - data["parent"] = json!(self.ns); - } - self.client.login().await?; - - let mut result = match self.client.get(&path, Some(data)).await { - Ok(res) => res, - Err(err) => match err.downcast_ref::() { - Some(HttpError { code, message }) => match code { - &StatusCode::NOT_FOUND => { - if self.ns.is_root() && max_depth.is_none() { - warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); - warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); - max_depth.replace(0); - } else { - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") - } - - return Ok(vec![self.ns.clone()]); - } - _ => { - bail!("Querying namespaces failed - HTTP error {code} - {message}"); - } - }, - None => { - bail!("Querying namespaces failed - {err}"); - } - }, - }; - - let list: Vec = - serde_json::from_value::>(result["data"].take())? - .into_iter() - .map(|list_item| list_item.ns) - .collect(); - - Ok(list) - } - - async fn list_groups( - &self, - namespace: &BackupNamespace, - _owner: &Authid, - ) -> Result, Error> { - let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); - - let args = if !namespace.is_root() { - Some(json!({ "ns": namespace.clone() })) - } else { - None - }; - - self.client.login().await?; - let mut result = - self.client.get(&path, args).await.map_err(|err| { - format_err!("Failed to retrieve backup groups from remote - {}", err) - })?; - - Ok( - serde_json::from_value::>(result["data"].take()) - .map_err(Error::from)? - .into_iter() - .map(|item| item.backup) - .collect::>(), - ) - } - - async fn list_backup_dirs( - &self, - namespace: &BackupNamespace, - group: &BackupGroup, - ) -> Result, Error> { - let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); - - let mut args = json!({ - "backup-type": group.ty, - "backup-id": group.id, - }); - - if !namespace.is_root() { - args["ns"] = serde_json::to_value(namespace)?; - } - - self.client.login().await?; - - let mut result = self.client.get(&path, Some(args)).await?; - let snapshot_list: Vec = serde_json::from_value(result["data"].take())?; - Ok(snapshot_list - .into_iter() - .filter_map(|item: SnapshotListItem| { - let snapshot = item.backup; - // in-progress backups can't be synced - if item.size.is_none() { - info!("skipping snapshot {snapshot} - in-progress backup"); - return None; - } - - Some(snapshot) - }) - .collect::>()) - } - - fn get_ns(&self) -> BackupNamespace { - self.ns.clone() - } - - fn get_store(&self) -> &str { - self.repo.store() - } - - async fn reader( - &self, - ns: &BackupNamespace, - dir: &BackupDir, - ) -> Result, Error> { - let backup_reader = - BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; - Ok(Arc::new(RemoteSourceReader { - backup_reader, - dir: dir.clone(), - })) - } -} - -#[async_trait::async_trait] -impl PullSource for LocalSource { - async fn list_namespaces( - &self, - max_depth: &mut Option, - ) -> Result, Error> { - ListNamespacesRecursive::new_max_depth( - self.store.clone(), - self.ns.clone(), - max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), - )? - .collect() - } - - async fn list_groups( - &self, - namespace: &BackupNamespace, - owner: &Authid, - ) -> Result, Error> { - Ok(ListAccessibleBackupGroups::new_with_privs( - &self.store, - namespace.clone(), - 0, - Some(PRIV_DATASTORE_READ), - Some(PRIV_DATASTORE_BACKUP), - Some(owner), - )? - .filter_map(Result::ok) - .map(|backup_group| backup_group.group().clone()) - .collect::>()) - } - - async fn list_backup_dirs( - &self, - namespace: &BackupNamespace, - group: &BackupGroup, - ) -> Result, Error> { - Ok(self - .store - .backup_group(namespace.clone(), group.clone()) - .iter_snapshots()? - .filter_map(Result::ok) - .map(|snapshot| snapshot.dir().to_owned()) - .collect::>()) - } - - fn get_ns(&self) -> BackupNamespace { - self.ns.clone() - } - - fn get_store(&self) -> &str { - self.store.name() - } - - async fn reader( - &self, - ns: &BackupNamespace, - dir: &BackupDir, - ) -> Result, Error> { - let dir = self.store.backup_dir(ns.clone(), dir.clone())?; - let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( - &dir.full_path(), - "snapshot", - "locked by another operation", - )?; - Ok(Arc::new(LocalSourceReader { - _dir_lock: Arc::new(Mutex::new(dir_lock)), - path: dir.full_path(), - datastore: dir.datastore().clone(), - })) - } -} - /// Parameters for a pull operation. pub(crate) struct PullParameters { /// Where data is pulled from - source: Arc, + source: Arc, /// Where data should be pulled into target: PullTarget, /// Owner of synced groups (needs to match local owner of pre-existing groups) @@ -341,7 +78,7 @@ impl PullParameters { }; let remove_vanished = remove_vanished.unwrap_or(false); - let source: Arc = if let Some(remote) = remote { + let source: Arc = if let Some(remote) = remote { let (remote_config, _digest) = pbs_config::remote::config()?; let remote: Remote = remote_config.lookup("remote", remote)?; diff --git a/src/server/sync.rs b/src/server/sync.rs index 323bc1a27..f8a1e133d 100644 --- a/src/server/sync.rs +++ b/src/server/sync.rs @@ -6,18 +6,24 @@ use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::{bail, Error}; +use anyhow::{bail, format_err, Error}; use http::StatusCode; -use tracing::info; +use serde_json::json; +use tracing::{info, warn}; use proxmox_router::HttpError; -use pbs_api_types::{BackupDir, CryptMode}; -use pbs_client::{BackupReader, RemoteChunkReader}; +use pbs_api_types::{ + Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupListItem, SnapshotListItem, + MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_READ, +}; +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::manifest::CLIENT_LOG_BLOB_NAME; use pbs_datastore::read_chunk::AsyncReadChunk; -use pbs_datastore::{DataStore, LocalChunkReader}; +use pbs_datastore::{DataStore, ListNamespacesRecursive, LocalChunkReader}; + +use crate::backup::ListAccessibleBackupGroups; #[derive(Default)] pub(crate) struct RemovedVanishedStats { @@ -201,3 +207,263 @@ impl SyncSourceReader for LocalSourceReader { self.datastore.name() == target_store_name } } + +#[async_trait::async_trait] +/// `SyncSource` is a trait that provides an interface for synchronizing data/information from a +/// source. +/// The trait includes methods for listing namespaces, groups, and backup directories, +/// as well as retrieving a reader for reading data from the source. +pub(crate) trait SyncSource: Send + Sync { + /// Lists namespaces from the source. + async fn list_namespaces( + &self, + max_depth: &mut Option, + ) -> Result, Error>; + + /// Lists groups within a specific namespace from the source. + async fn list_groups( + &self, + namespace: &BackupNamespace, + owner: &Authid, + ) -> Result, Error>; + + /// Lists backup directories for a specific group within a specific namespace from the source. + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &BackupGroup, + ) -> Result, Error>; + fn get_ns(&self) -> BackupNamespace; + fn get_store(&self) -> &str; + + /// Returns a reader for reading data from a specific backup directory. + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error>; +} + +pub(crate) struct RemoteSource { + pub(crate) repo: BackupRepository, + pub(crate) ns: BackupNamespace, + pub(crate) client: HttpClient, +} + +pub(crate) struct LocalSource { + pub(crate) store: Arc, + pub(crate) ns: BackupNamespace, +} + +#[async_trait::async_trait] +impl SyncSource for RemoteSource { + async fn list_namespaces( + &self, + max_depth: &mut Option, + ) -> Result, Error> { + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { + return Ok(vec![self.ns.clone()]); + } + + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); + let mut data = json!({}); + if let Some(max_depth) = max_depth { + data["max-depth"] = json!(max_depth); + } + + if !self.ns.is_root() { + data["parent"] = json!(self.ns); + } + self.client.login().await?; + + let mut result = match self.client.get(&path, Some(data)).await { + Ok(res) => res, + Err(err) => match err.downcast_ref::() { + Some(HttpError { code, message }) => match code { + &StatusCode::NOT_FOUND => { + if self.ns.is_root() && max_depth.is_none() { + warn!("Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); + warn!("Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); + max_depth.replace(0); + } else { + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") + } + + return Ok(vec![self.ns.clone()]); + } + _ => { + bail!("Querying namespaces failed - HTTP error {code} - {message}"); + } + }, + None => { + bail!("Querying namespaces failed - {err}"); + } + }, + }; + + let list: Vec = + serde_json::from_value::>(result["data"].take())? + .into_iter() + .map(|list_item| list_item.ns) + .collect(); + + Ok(list) + } + + async fn list_groups( + &self, + namespace: &BackupNamespace, + _owner: &Authid, + ) -> Result, Error> { + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); + + let args = if !namespace.is_root() { + Some(json!({ "ns": namespace.clone() })) + } else { + None + }; + + self.client.login().await?; + let mut result = + self.client.get(&path, args).await.map_err(|err| { + format_err!("Failed to retrieve backup groups from remote - {}", err) + })?; + + Ok( + serde_json::from_value::>(result["data"].take()) + .map_err(Error::from)? + .into_iter() + .map(|item| item.backup) + .collect::>(), + ) + } + + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &BackupGroup, + ) -> Result, Error> { + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); + + let mut args = json!({ + "backup-type": group.ty, + "backup-id": group.id, + }); + + if !namespace.is_root() { + args["ns"] = serde_json::to_value(namespace)?; + } + + self.client.login().await?; + + let mut result = self.client.get(&path, Some(args)).await?; + let snapshot_list: Vec = serde_json::from_value(result["data"].take())?; + Ok(snapshot_list + .into_iter() + .filter_map(|item: SnapshotListItem| { + let snapshot = item.backup; + // in-progress backups can't be synced + if item.size.is_none() { + info!("skipping snapshot {snapshot} - in-progress backup"); + return None; + } + + Some(snapshot) + }) + .collect::>()) + } + + fn get_ns(&self) -> BackupNamespace { + self.ns.clone() + } + + fn get_store(&self) -> &str { + self.repo.store() + } + + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error> { + let backup_reader = + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; + Ok(Arc::new(RemoteSourceReader { + backup_reader, + dir: dir.clone(), + })) + } +} + +#[async_trait::async_trait] +impl SyncSource for LocalSource { + async fn list_namespaces( + &self, + max_depth: &mut Option, + ) -> Result, Error> { + ListNamespacesRecursive::new_max_depth( + self.store.clone(), + self.ns.clone(), + max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), + )? + .collect() + } + + async fn list_groups( + &self, + namespace: &BackupNamespace, + owner: &Authid, + ) -> Result, Error> { + Ok(ListAccessibleBackupGroups::new_with_privs( + &self.store, + namespace.clone(), + 0, + Some(PRIV_DATASTORE_READ), + Some(PRIV_DATASTORE_BACKUP), + Some(owner), + )? + .filter_map(Result::ok) + .map(|backup_group| backup_group.group().clone()) + .collect::>()) + } + + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &BackupGroup, + ) -> Result, Error> { + Ok(self + .store + .backup_group(namespace.clone(), group.clone()) + .iter_snapshots()? + .filter_map(Result::ok) + .map(|snapshot| snapshot.dir().to_owned()) + .collect::>()) + } + + fn get_ns(&self) -> BackupNamespace { + self.ns.clone() + } + + fn get_store(&self) -> &str { + self.store.name() + } + + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error> { + let dir = self.store.backup_dir(ns.clone(), dir.clone())?; + let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared( + &dir.full_path(), + "snapshot", + "locked by another operation", + )?; + Ok(Arc::new(LocalSourceReader { + _dir_lock: Arc::new(Mutex::new(dir_lock)), + path: dir.full_path(), + datastore: dir.datastore().clone(), + })) + } +} -- 2.39.2 _______________________________________________ pbs-devel mailing list pbs-devel@lists.proxmox.com https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel