From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 690B2AF8A for ; Tue, 8 Aug 2023 14:13:59 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 5167497CF for ; Tue, 8 Aug 2023 14:13:59 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Tue, 8 Aug 2023 14:13:56 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 0036543784 for ; Tue, 8 Aug 2023 14:13:56 +0200 (CEST) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Tue, 8 Aug 2023 14:13:43 +0200 Message-Id: <20230808121344.199500-6-h.laimer@proxmox.com> X-Mailer: git-send-email 2.39.2 In-Reply-To: <20230808121344.199500-1-h.laimer@proxmox.com> References: <20230808121344.199500-1-h.laimer@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.067 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 08 Aug 2023 12:13:59 -0000 ... making the pull logic independent from the actual source using two traits. Signed-off-by: Hannes Laimer --- Cargo.toml | 2 + pbs-datastore/src/read_chunk.rs | 2 +- src/api2/config/remote.rs | 14 +- src/api2/pull.rs | 31 +- src/server/pull.rs | 943 +++++++++++++++++++------------- 5 files changed, 570 insertions(+), 422 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d34f8a1..74cb68e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" } # regular crates anyhow = "1.0" +async-trait = "0.1.56" apt-pkg-native = "0.3.2" base64 = "0.13" bitflags = "1.2.1" @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] } [dependencies] anyhow.workspace = true +async-trait.workspace = true apt-pkg-native.workspace = true base64.workspace = true bitflags.workspace = true diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs index c04a7431..29ee2d4c 100644 --- a/pbs-datastore/src/read_chunk.rs +++ b/pbs-datastore/src/read_chunk.rs @@ -14,7 +14,7 @@ pub trait ReadChunk { fn read_chunk(&self, digest: &[u8; 32]) -> Result, Error>; } -pub trait AsyncReadChunk: Send { +pub trait AsyncReadChunk: Send + Sync { /// Returns the encoded chunk data fn read_raw_chunk<'a>( &'a self, diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs index 307cf3cd..2511c5d5 100644 --- a/src/api2/config/remote.rs +++ b/src/api2/config/remote.rs @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option) -> Result<(), Error> Ok(()) } -/// Helper to get client for remote.cfg entry -pub async fn remote_client( +/// Helper to get client for remote.cfg entry without login, just config +pub fn remote_client_config( remote: &Remote, limit: Option, ) -> Result { @@ -320,6 +320,16 @@ pub async fn remote_client( &remote.config.auth_id, options, )?; + + Ok(client) +} + +/// Helper to get client for remote.cfg entry +pub async fn remote_client( + remote: &Remote, + limit: Option, +) -> Result { + let client = remote_client_config(remote, limit)?; let _auth_info = client .login() // make sure we can auth .await diff --git a/src/api2/pull.rs b/src/api2/pull.rs index 664ecce5..e36a5b14 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -8,7 +8,7 @@ use proxmox_sys::task_log; use pbs_api_types::{ Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, - GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, + GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, TRANSFER_LAST_SCHEMA, }; @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters { PullParameters::new( &sync_job.store, sync_job.ns.clone().unwrap_or_default(), - sync_job.remote.as_deref().unwrap_or("local"), + sync_job.remote.as_deref(), &sync_job.remote_store, sync_job.remote_ns.clone().unwrap_or_default(), sync_job @@ -124,7 +124,6 @@ pub fn do_sync_job( let worker_future = async move { let pull_params = PullParameters::try_from(&sync_job)?; - let client = pull_params.client().await?; task_log!(worker, "Starting datastore sync job '{}'", job_id); if let Some(event_str) = schedule { @@ -138,24 +137,7 @@ pub fn do_sync_job( sync_job.remote_store, ); - if sync_job.remote.is_some() { - pull_store(&worker, &client, pull_params).await?; - } else { - if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) { - if target_ns.path().starts_with(source_ns.path()) - && sync_job.store == sync_job.remote_store - && sync_job.max_depth.map_or(true, |sync_depth| { - target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH - }) { - task_log!( - worker, - "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping" - ); - } - } else { - pull_store(&worker, &client, pull_params).await?; - } - } + pull_store(&worker, pull_params).await?; task_log!(worker, "sync job '{}' end", &job_id); @@ -284,7 +266,7 @@ async fn pull( let pull_params = PullParameters::new( &store, ns, - remote.as_deref().unwrap_or("local"), + remote.as_deref(), &remote_store, remote_ns.unwrap_or_default(), auth_id.clone(), @@ -294,7 +276,6 @@ async fn pull( limit, transfer_last, )?; - let client = pull_params.client().await?; // fixme: set to_stdout to false? // FIXME: add namespace to worker id? @@ -312,7 +293,7 @@ async fn pull( remote_store, ); - let pull_future = pull_store(&worker, &client, pull_params); + let pull_future = pull_store(&worker, pull_params); (select! { success = pull_future.fuse() => success, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, @@ -327,4 +308,4 @@ async fn pull( Ok(upid_str) } -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL); \ No newline at end of file +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL); diff --git a/src/server/pull.rs b/src/server/pull.rs index e55452d1..e1a27a8c 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,28 +1,26 @@ //! Sync datastore from remote server use std::collections::{HashMap, HashSet}; -use std::io::{Seek, SeekFrom}; +use std::io::Seek; +use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use http::StatusCode; -use pbs_config::CachedUserInfo; -use serde_json::json; - +use proxmox_rest_server::WorkerTask; use proxmox_router::HttpError; -use proxmox_sys::task_log; +use proxmox_sys::{task_log, task_warn}; +use serde_json::json; use pbs_api_types::{ - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem, - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, }; - -use pbs_client::{ - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, -}; +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; +use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; +use pbs_datastore::read_chunk::AsyncReadChunk; use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; use pbs_tools::sha::sha256; -use proxmox_rest_server::WorkerTask; use crate::backup::{check_ns_modification_privs, check_ns_privs}; use crate::tools::parallel_handler::ParallelHandler; -/// Parameters for a pull operation. -pub(crate) struct PullParameters { - /// Remote that is pulled from - remote: Remote, - /// Full specification of remote datastore - source: BackupRepository, - /// Local store that is pulled into +struct RemoteReader { + backup_reader: Arc, + dir: BackupDir, +} + +pub(crate) struct PullTarget { store: Arc, - /// Remote namespace - remote_ns: BackupNamespace, - /// Local namespace (anchor) ns: BackupNamespace, +} + +pub(crate) struct RemoteSource { + repo: BackupRepository, + ns: BackupNamespace, + client: HttpClient, +} + +#[async_trait::async_trait] +/// `PullSource` is a trait that provides an interface for pulling data/information from a source. +/// The trait includes methods for listing namespaces, groups, and backup directories, +/// as well as retrieving a reader for reading data from the source +trait PullSource: Send + Sync { + /// Lists namespaces from the source. + async fn list_namespaces( + &self, + max_depth: &mut Option, + worker: &WorkerTask, + ) -> Result, Error>; + + /// Lists groups within a specific namespace from the source. + async fn list_groups( + &self, + namespace: &BackupNamespace, + owner: &Authid, + ) -> Result, Error>; + + /// Lists backup directories for a specific group within a specific namespace from the source. + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &BackupGroup, + worker: &WorkerTask, + ) -> Result, Error>; + fn get_ns(&self) -> BackupNamespace; + fn print_store_and_ns(&self) -> String; + + /// Returns a reader for reading data from a specific backup directory. + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error>; +} + +#[async_trait::async_trait] +impl PullSource for RemoteSource { + async fn list_namespaces( + &self, + max_depth: &mut Option, + worker: &WorkerTask, + ) -> Result, Error> { + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { + vec![self.ns.clone()]; + } + + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); + let mut data = json!({}); + if let Some(max_depth) = max_depth { + data["max-depth"] = json!(max_depth); + } + + if !self.ns.is_root() { + data["parent"] = json!(self.ns); + } + self.client.login().await?; + + let mut result = match self.client.get(&path, Some(data)).await { + Ok(res) => res, + Err(err) => match err.downcast_ref::() { + Some(HttpError { code, message }) => match code { + &StatusCode::NOT_FOUND => { + if self.ns.is_root() && max_depth.is_none() { + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); + max_depth.replace(0); + } else { + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") + } + + return Ok(vec![self.ns.clone()]); + } + _ => { + bail!("Querying namespaces failed - HTTP error {code} - {message}"); + } + }, + None => { + bail!("Querying namespaces failed - {err}"); + } + }, + }; + + let list: Vec = + serde_json::from_value::>(result["data"].take())? + .iter() + .map(|list_item| list_item.ns.clone()) + .collect(); + + Ok(list) + } + + async fn list_groups( + &self, + namespace: &BackupNamespace, + _owner: &Authid, + ) -> Result, Error> { + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); + + let args = if !namespace.is_root() { + Some(json!({ "ns": namespace.clone() })) + } else { + None + }; + + self.client.login().await?; + let mut result = + self.client.get(&path, args).await.map_err(|err| { + format_err!("Failed to retrieve backup groups from remote - {}", err) + })?; + + Ok( + serde_json::from_value::>(result["data"].take()) + .map_err(Error::from)? + .into_iter() + .map(|item| item.backup) + .collect::>(), + ) + } + + async fn list_backup_dirs( + &self, + _namespace: &BackupNamespace, + group: &BackupGroup, + worker: &WorkerTask, + ) -> Result, Error> { + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); + + let mut args = json!({ + "backup-type": group.ty, + "backup-id": group.id, + }); + + if !self.ns.is_root() { + args["ns"] = serde_json::to_value(&self.ns)?; + } + + self.client.login().await?; + + let mut result = self.client.get(&path, Some(args)).await?; + let snapshot_list: Vec = serde_json::from_value(result["data"].take())?; + Ok(snapshot_list + .into_iter() + .filter_map(|item: SnapshotListItem| { + let snapshot = item.backup; + // in-progress backups can't be synced + if item.size.is_none() { + task_log!( + worker, + "skipping snapshot {} - in-progress backup", + snapshot + ); + return None; + } + + Some(snapshot) + }) + .collect::>()) + } + + fn get_ns(&self) -> BackupNamespace { + self.ns.clone() + } + + fn print_store_and_ns(&self) -> String { + print_store_and_ns(self.repo.store(), &self.ns) + } + + async fn reader( + &self, + ns: &BackupNamespace, + dir: &BackupDir, + ) -> Result, Error> { + let backup_reader = + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; + Ok(Arc::new(RemoteReader { + backup_reader, + dir: dir.clone(), + })) + } +} + +#[async_trait::async_trait] +/// `PullReader` is a trait that provides an interface for reading data from a source. +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. +trait PullReader: Send + Sync { + /// Returns a chunk reader with the specified encryption mode. + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc; + + /// Asynchronously loads a file from the source into a local file. + /// `filename` is the name of the file to load from the source. + /// `into` is the path of the local file to load the source file into. + async fn load_file_into( + &self, + filename: &str, + into: &Path, + worker: &WorkerTask, + ) -> Result, Error>; + + /// Tries to download the client log from the source and save it into a local file. + async fn try_download_client_log( + &self, + to_path: &Path, + worker: &WorkerTask, + ) -> Result<(), Error>; + + fn skip_chunk_sync(&self, target_store_name: &str) -> bool; +} + +#[async_trait::async_trait] +impl PullReader for RemoteReader { + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { + Arc::new(RemoteChunkReader::new( + self.backup_reader.clone(), + None, + crypt_mode, + HashMap::new(), + )) + } + + async fn load_file_into( + &self, + filename: &str, + into: &Path, + worker: &WorkerTask, + ) -> Result, Error> { + let mut tmp_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(into)?; + let download_result = self.backup_reader.download(filename, &mut tmp_file).await; + if let Err(err) = download_result { + match err.downcast_ref::() { + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + task_log!( + worker, + "skipping snapshot {} - vanished since start of sync", + &self.dir, + ); + return Ok(None); + } + _ => { + bail!("HTTP error {code} - {message}"); + } + }, + None => { + return Err(err); + } + }; + }; + tmp_file.rewind()?; + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) + } + + async fn try_download_client_log( + &self, + to_path: &Path, + worker: &WorkerTask, + ) -> Result<(), Error> { + let mut tmp_path = to_path.to_owned(); + tmp_path.set_extension("tmp"); + + let tmpfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .read(true) + .open(&tmp_path)?; + + // Note: be silent if there is no log - only log successful download + if let Ok(()) = self + .backup_reader + .download(CLIENT_LOG_BLOB_NAME, tmpfile) + .await + { + if let Err(err) = std::fs::rename(&tmp_path, to_path) { + bail!("Atomic rename file {:?} failed - {}", to_path, err); + } + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); + } + + Ok(()) + } + + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { + false + } +} + +/// Parameters for a pull operation. +pub(crate) struct PullParameters { + /// Where data is pulled from + source: Arc, + /// Where data should be pulled into + target: PullTarget, /// Owner of synced groups (needs to match local owner of pre-existing groups) owner: Authid, /// Whether to remove groups which exist locally, but not on the remote end @@ -57,22 +357,16 @@ pub(crate) struct PullParameters { max_depth: Option, /// Filters for reducing the pull scope group_filter: Option>, - /// Rate limits for all transfers from `remote` - limit: RateLimitConfig, /// How many snapshots should be transferred at most (taking the newest N snapshots) transfer_last: Option, } impl PullParameters { /// Creates a new instance of `PullParameters`. - /// - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a - /// [BackupRepository] with `remote_store`. - #[allow(clippy::too_many_arguments)] pub(crate) fn new( store: &str, ns: BackupNamespace, - remote: &str, + remote: Option<&str>, remote_store: &str, remote_ns: BackupNamespace, owner: Authid, @@ -82,49 +376,56 @@ impl PullParameters { limit: RateLimitConfig, transfer_last: Option, ) -> Result { - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; - if let Some(max_depth) = max_depth { ns.check_max_depth(max_depth)?; remote_ns.check_max_depth(max_depth)?; - } - - let (remote_config, _digest) = pbs_config::remote::config()?; - let remote: Remote = remote_config.lookup("remote", remote)?; - + }; let remove_vanished = remove_vanished.unwrap_or(false); - let source = BackupRepository::new( - Some(remote.config.auth_id.clone()), - Some(remote.config.host.clone()), - remote.config.port, - remote_store.to_string(), - ); + let source: Arc = if let Some(remote) = remote { + let (remote_config, _digest) = pbs_config::remote::config()?; + let remote: Remote = remote_config.lookup("remote", remote)?; - Ok(Self { - remote, - remote_ns, + let repo = BackupRepository::new( + Some(remote.config.auth_id.clone()), + Some(remote.config.host.clone()), + remote.config.port, + remote_store.to_string(), + ); + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; + Arc::new(RemoteSource { + repo, + ns: remote_ns, + client, + }) + } else { + bail!("local sync not implemented yet") + }; + let target = PullTarget { + store: DataStore::lookup_datastore(store, Some(Operation::Write))?, ns, + }; + + Ok(Self { source, - store, + target, owner, remove_vanished, max_depth, group_filter, - limit, transfer_last, }) } - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. - pub async fn client(&self) -> Result { - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await + pub(crate) fn get_target_ns(&self) -> Result { + let source_ns = self.source.get_ns(); + source_ns.map_prefix(&source_ns, &self.target.ns) } } async fn pull_index_chunks( worker: &WorkerTask, - chunk_reader: RemoteChunkReader, + chunk_reader: Arc, target: Arc, index: I, downloaded_chunks: Arc>>, @@ -215,26 +516,6 @@ async fn pull_index_chunks( Ok(()) } -async fn download_manifest( - reader: &BackupReader, - filename: &std::path::Path, -) -> Result { - let mut tmp_manifest_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .read(true) - .open(filename)?; - - reader - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) - .await?; - - tmp_manifest_file.seek(SeekFrom::Start(0))?; - - Ok(tmp_manifest_file) -} - fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { if size != info.size { bail!( @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err /// Pulls a single file referenced by a manifest. /// /// Pulling an archive consists of the following steps: -/// - Create tmp file for archive -/// - Download archive file into tmp file -/// - Verify tmp file checksum +/// - Load archive file into tmp file +/// -- Load file into tmp file +/// -- Verify tmp file checksum /// - if archive is an index, pull referenced chunks /// - Rename tmp file into real path -async fn pull_single_archive( - worker: &WorkerTask, - reader: &BackupReader, - chunk_reader: &mut RemoteChunkReader, - snapshot: &pbs_datastore::BackupDir, - archive_info: &FileInfo, +async fn pull_single_archive<'a>( + worker: &'a WorkerTask, + reader: Arc, + snapshot: &'a pbs_datastore::BackupDir, + archive_info: &'a FileInfo, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let archive_name = &archive_info.filename; @@ -277,13 +557,11 @@ async fn pull_single_archive( task_log!(worker, "sync archive {}", archive_name); - let mut tmpfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .read(true) - .open(&tmp_path)?; + reader + .load_file_into(archive_name, &tmp_path, worker) + .await?; - reader.download(archive_name, &mut tmpfile).await?; + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; match archive_type(archive_name)? { ArchiveType::DynamicIndex => { @@ -293,14 +571,18 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + if reader.skip_chunk_sync(snapshot.datastore().name()) { + task_log!(worker, "skipping chunk sync for same datatsore"); + } else { + pull_index_chunks( + worker, + reader.chunk_reader(archive_info.crypt_mode), + snapshot.datastore().clone(), + index, + downloaded_chunks, + ) + .await?; + } } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile).map_err(|err| { @@ -309,17 +591,21 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + if reader.skip_chunk_sync(snapshot.datastore().name()) { + task_log!(worker, "skipping chunk sync for same datatsore"); + } else { + pull_index_chunks( + worker, + reader.chunk_reader(archive_info.crypt_mode), + snapshot.datastore().clone(), + index, + downloaded_chunks, + ) + .await?; + } } ArchiveType::Blob => { - tmpfile.seek(SeekFrom::Start(0))?; + tmpfile.rewind()?; let (csum, size) = sha256(&mut tmpfile)?; verify_archive(archive_info, &csum, size)?; } @@ -330,33 +616,6 @@ async fn pull_single_archive( Ok(()) } -// Note: The client.log.blob is uploaded after the backup, so it is -// not mentioned in the manifest. -async fn try_client_log_download( - worker: &WorkerTask, - reader: Arc, - path: &std::path::Path, -) -> Result<(), Error> { - let mut tmp_path = path.to_owned(); - tmp_path.set_extension("tmp"); - - let tmpfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .read(true) - .open(&tmp_path)?; - - // Note: be silent if there is no log - only log successful download - if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { - if let Err(err) = std::fs::rename(&tmp_path, path) { - bail!("Atomic rename file {:?} failed - {}", path, err); - } - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); - } - - Ok(()) -} - /// Actual implementation of pulling a snapshot. /// /// Pulling a snapshot consists of the following steps: @@ -366,10 +625,10 @@ async fn try_client_log_download( /// -- if file already exists, verify contents /// -- if not, pull it from the remote /// - Download log if not already existing -async fn pull_snapshot( - worker: &WorkerTask, - reader: Arc, - snapshot: &pbs_datastore::BackupDir, +async fn pull_snapshot<'a>( + worker: &'a WorkerTask, + reader: Arc, + snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let mut manifest_name = snapshot.full_path(); @@ -380,32 +639,15 @@ async fn pull_snapshot( let mut tmp_manifest_name = manifest_name.clone(); tmp_manifest_name.set_extension("tmp"); - - let download_res = download_manifest(&reader, &tmp_manifest_name).await; - let mut tmp_manifest_file = match download_res { - Ok(manifest_file) => manifest_file, - Err(err) => { - match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - task_log!( - worker, - "skipping snapshot {} - vanished since start of sync", - snapshot.dir(), - ); - return Ok(()); - } - _ => { - bail!("HTTP error {code} - {message}"); - } - }, - None => { - return Err(err); - } - }; - } - }; - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; + let tmp_manifest_blob; + if let Some(data) = reader + .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker) + .await? + { + tmp_manifest_blob = data; + } else { + return Ok(()); + } if manifest_name.exists() { let manifest_blob = proxmox_lang::try_block!({ @@ -422,8 +664,10 @@ async fn pull_snapshot( if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; - } + reader + .try_download_client_log(&client_log_name, worker) + .await?; + }; task_log!(worker, "no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(()); // nothing changed @@ -471,17 +715,9 @@ async fn pull_snapshot( } } - let mut chunk_reader = RemoteChunkReader::new( - reader.clone(), - None, - item.chunk_crypt_mode(), - HashMap::new(), - ); - pull_single_archive( worker, - &reader, - &mut chunk_reader, + reader.clone(), snapshot, item, downloaded_chunks.clone(), @@ -494,9 +730,10 @@ async fn pull_snapshot( } if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; - } - + reader + .try_download_client_log(&client_log_name, worker) + .await?; + }; snapshot .cleanup_unreferenced_files(&manifest) .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; @@ -506,12 +743,12 @@ async fn pull_snapshot( /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. /// -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is -/// pointing to the local datastore and target namespace. -async fn pull_snapshot_from( - worker: &WorkerTask, - reader: Arc, - snapshot: &pbs_datastore::BackupDir, +/// The `reader` is configured to read from the source backup directory, while the +/// `snapshot` is pointing to the local datastore and target namespace. +async fn pull_snapshot_from<'a>( + worker: &'a WorkerTask, + reader: Arc, + snapshot: &'a pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let (_path, is_new, _snap_lock) = snapshot @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo { /// - Sort by snapshot time /// - Get last snapshot timestamp on local datastore /// - Iterate over list of snapshots -/// -- Recreate client/BackupReader /// -- pull snapshot, unless it's not finished yet or older than last local snapshot /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote /// -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the /// remote when querying snapshots. This allows us to interact with old remotes that don't have /// namespace support yet. /// @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo { /// - local group owner is already checked by pull_store async fn pull_group( worker: &WorkerTask, - client: &HttpClient, params: &PullParameters, - group: &pbs_api_types::BackupGroup, - remote_ns: BackupNamespace, + source_namespace: &BackupNamespace, + group: &BackupGroup, progress: &mut StoreProgress, ) -> Result<(), Error> { - task_log!(worker, "sync group {}", group); - - let path = format!( - "api2/json/admin/datastore/{}/snapshots", - params.source.store() - ); - - let mut args = json!({ - "backup-type": group.ty, - "backup-id": group.id, - }); - - if !remote_ns.is_root() { - args["ns"] = serde_json::to_value(&remote_ns)?; - } - - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; - - let mut result = client.get(&path, Some(args)).await?; - let mut list: Vec = serde_json::from_value(result["data"].take())?; - - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); - - client.login().await?; // make sure auth is complete - - let fingerprint = client.fingerprint(); - - let last_sync = params.store.last_successful_backup(&target_ns, group)?; - let last_sync_time = last_sync.unwrap_or(i64::MIN); - - let mut remote_snapshots = std::collections::HashSet::new(); - - // start with 65536 chunks (up to 256 GiB) - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); - - progress.group_snapshots = list.len() as u64; - let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); - let total_amount = list.len(); + let mut raw_list: Vec = params + .source + .list_backup_dirs(source_namespace, group, worker) + .await?; + raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); + + let total_amount = raw_list.len(); let cutoff = params .transfer_last .map(|count| total_amount.saturating_sub(count)) .unwrap_or_default(); - for (pos, item) in list.into_iter().enumerate() { - let snapshot = item.backup; - - // in-progress backups can't be synced - if item.size.is_none() { - task_log!( - worker, - "skipping snapshot {} - in-progress backup", - snapshot - ); - continue; - } - - remote_snapshots.insert(snapshot.time); + let target_ns = params.get_target_ns()?; - if last_sync_time > snapshot.time { - already_synced_skip_info.update(snapshot.time); - continue; - } else if already_synced_skip_info.count > 0 { - task_log!(worker, "{}", already_synced_skip_info); - already_synced_skip_info.reset(); - } - - if pos < cutoff && last_sync_time != snapshot.time { - transfer_last_skip_info.update(snapshot.time); - continue; - } else if transfer_last_skip_info.count > 0 { - task_log!(worker, "{}", transfer_last_skip_info); - transfer_last_skip_info.reset(); - } - - // get updated auth_info (new tickets) - let auth_info = client.login().await?; + let mut source_snapshots = HashSet::new(); + let last_sync_time = params + .target + .store + .last_successful_backup(&target_ns, group)? + .unwrap_or(i64::MIN); + + let list: Vec = raw_list + .into_iter() + .enumerate() + .filter(|&(pos, ref dir)| { + source_snapshots.insert(dir.time); + if last_sync_time > dir.time { + already_synced_skip_info.update(dir.time); + return false; + } else if already_synced_skip_info.count > 0 { + task_log!(worker, "{}", already_synced_skip_info); + already_synced_skip_info.reset(); + return true; + } - let options = - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()) - .rate_limit(params.limit.clone()); + if pos < cutoff && last_sync_time != dir.time { + transfer_last_skip_info.update(dir.time); + return false; + } else if transfer_last_skip_info.count > 0 { + task_log!(worker, "{}", transfer_last_skip_info); + transfer_last_skip_info.reset(); + } + true + }) + .map(|(_, dir)| dir) + .collect(); - let new_client = HttpClient::new( - params.source.host(), - params.source.port(), - params.source.auth_id(), - options, - )?; + // start with 65536 chunks (up to 256 GiB) + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); - let reader = BackupReader::start( - &new_client, - None, - params.source.store(), - &remote_ns, - &snapshot, - true, - ) - .await?; + progress.group_snapshots = list.len() as u64; - let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; + for (pos, from_snapshot) in list.into_iter().enumerate() { + let to_snapshot = params + .target + .store + .backup_dir(params.target.ns.clone(), from_snapshot.clone())?; - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; + let reader = params + .source + .reader(source_namespace, &from_snapshot) + .await?; + let result = + pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await; progress.done_snapshots = pos as u64 + 1; task_log!(worker, "percentage done: {}", progress); @@ -758,11 +956,14 @@ async fn pull_group( } if params.remove_vanished { - let group = params.store.backup_group(target_ns.clone(), group.clone()); + let group = params + .target + .store + .backup_group(params.get_target_ns()?, group.clone()); let local_list = group.list_backups()?; for info in local_list { let snapshot = info.backup_dir; - if remote_snapshots.contains(&snapshot.backup_time()) { + if source_snapshots.contains(&snapshot.backup_time()) { continue; } if snapshot.is_protected() { @@ -774,73 +975,23 @@ async fn pull_group( continue; } task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); - params - .store - .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; + params.target.store.remove_backup_dir( + ¶ms.get_target_ns()?, + snapshot.as_ref(), + false, + )?; } } Ok(()) } -// will modify params if switching to backwards mode for lack of NS support on remote end -async fn query_namespaces( - worker: &WorkerTask, - client: &HttpClient, - params: &mut PullParameters, -) -> Result, Error> { - let path = format!( - "api2/json/admin/datastore/{}/namespace", - params.source.store() - ); - let mut data = json!({}); - if let Some(max_depth) = params.max_depth { - data["max-depth"] = json!(max_depth); - } - - if !params.remote_ns.is_root() { - data["parent"] = json!(params.remote_ns); - } - - let mut result = match client.get(&path, Some(data)).await { - Ok(res) => res, - Err(err) => match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - if params.remote_ns.is_root() && params.max_depth.is_none() { - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); - params.max_depth = Some(0); - } else { - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") - } - - return Ok(vec![params.remote_ns.clone()]); - } - _ => { - bail!("Querying namespaces failed - HTTP error {code} - {message}"); - } - }, - None => { - bail!("Querying namespaces failed - {err}"); - } - }, - }; - - let mut list: Vec = serde_json::from_value(result["data"].take())?; - - // parents first - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); - - Ok(list.iter().map(|item| item.ns.clone()).collect()) -} - fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result { let mut created = false; - let store_ns_str = print_store_and_ns(params.store.name(), ns); + let store_ns_str = print_store_and_ns(params.target.store.name(), ns); - if !ns.is_root() && !params.store.namespace_path(ns).exists() { - check_ns_modification_privs(params.store.name(), ns, ¶ms.owner) + if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { + check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; let name = match ns.components().last() { @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< } }; - if let Err(err) = params.store.create_namespace(&ns.parent(), name) { + if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); } created = true; } check_ns_privs( - params.store.name(), + params.target.store.name(), ns, ¶ms.owner, PRIV_DATASTORE_BACKUP, @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< } fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result { - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner) + check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; - params.store.remove_namespace_recursive(local_ns, true) + params + .target + .store + .remove_namespace_recursive(local_ns, true) } fn check_and_remove_vanished_ns( @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns( // clamp like remote does so that we don't list more than we can ever have synced. let max_depth = params .max_depth - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth()); + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); let mut local_ns_list: Vec = params + .target .store - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))? + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? .filter(|ns| { let user_privs = - user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name())); + user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 }) .collect(); @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns( local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); for local_ns in local_ns_list { - if local_ns == params.ns { + if local_ns == params.target.ns { continue; } @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns( /// - access to sub-NS checked here pub(crate) async fn pull_store( worker: &WorkerTask, - client: &HttpClient, mut params: PullParameters, ) -> Result<(), Error> { // explicit create shared lock to prevent GC on newly created chunks - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; + let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; let mut errors = false; let old_max_depth = params.max_depth; - let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) { - vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces! + let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) { + vec![params.source.get_ns()] // backwards compat - don't query remote namespaces! } else { - query_namespaces(worker, client, &mut params).await? + params + .source + .list_namespaces(&mut params.max_depth, worker) + .await? }; + + let ns_layers_to_be_pulled = namespaces + .iter() + .map(BackupNamespace::depth) + .max() + .map_or(0, |v| v - params.source.get_ns().depth()); + let target_depth = params.target.ns.depth(); + + if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH { + bail!( + "Syncing would exceed max allowed namespace depth. ({}+{} > {})", + ns_layers_to_be_pulled, + target_depth, + MAX_NAMESPACE_DEPTH + ); + } + errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode + namespaces.sort_unstable_by_key(|a| a.name_len()); let (mut groups, mut snapshots) = (0, 0); let mut synced_ns = HashSet::with_capacity(namespaces.len()); for namespace in namespaces { - let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace); + let source_store_ns_str = params.source.print_store_and_ns(); - let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?; - let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns); + let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; + let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); task_log!(worker, "----"); task_log!( @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store( } } - match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await { + match pull_ns(worker, &namespace, &mut params).await { Ok((ns_progress, ns_errors)) => { errors |= ns_errors; @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store( task_log!( worker, "Encountered errors while syncing namespace {} - {}", - namespace, + &namespace, err, ); } @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store( /// - owner check for vanished groups done here pub(crate) async fn pull_ns( worker: &WorkerTask, - client: &HttpClient, - params: &PullParameters, - source_ns: BackupNamespace, - target_ns: BackupNamespace, + namespace: &BackupNamespace, + params: &mut PullParameters, ) -> Result<(StoreProgress, bool), Error> { - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); - - let args = if !source_ns.is_root() { - Some(json!({ - "ns": source_ns, - })) - } else { - None - }; - - let mut result = client - .get(&path, args) - .await - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; - - let mut list: Vec = serde_json::from_value(result["data"].take())?; + let mut list: Vec = params.source.list_groups(namespace, ¶ms.owner).await?; let total_count = list.len(); list.sort_unstable_by(|a, b| { - let type_order = a.backup.ty.cmp(&b.backup.ty); + let type_order = a.ty.cmp(&b.ty); if type_order == std::cmp::Ordering::Equal { - a.backup.id.cmp(&b.backup.id) + a.id.cmp(&b.id) } else { type_order } }); - let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool { + let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool { filters.iter().any(|filter| group.matches(filter)) }; - // Get groups with target NS set - let list: Vec = list.into_iter().map(|item| item.backup).collect(); - let list = if let Some(ref group_filter) = ¶ms.group_filter { let unfiltered_count = list.len(); - let list: Vec = list + let list: Vec = list .into_iter() .filter(|group| apply_filters(group, group_filter)) .collect(); @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns( let mut errors = false; - let mut new_groups = std::collections::HashSet::new(); + let mut new_groups = HashSet::new(); for group in list.iter() { new_groups.insert(group.clone()); } let mut progress = StoreProgress::new(list.len() as u64); + let target_ns = params.get_target_ns()?; for (done, group) in list.into_iter().enumerate() { progress.done_groups = done as u64; progress.done_snapshots = 0; @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns( let (owner, _lock_guard) = match params + .target .store .create_locked_backup_group(&target_ns, &group, ¶ms.owner) { @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns( &group, err ); - errors = true; // do not stop here, instead continue + errors = true; + // do not stop here, instead continue + task_log!(worker, "create_locked_backup_group failed"); continue; } }; @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns( owner ); errors = true; // do not stop here, instead continue - } else if let Err(err) = pull_group( - worker, - client, - params, - &group, - source_ns.clone(), - &mut progress, - ) - .await + } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await { task_log!(worker, "sync group {} failed - {}", &group, err,); errors = true; // do not stop here, instead continue @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns( if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ - for local_group in params.store.iter_backup_groups(target_ns.clone())? { + for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { let local_group = local_group?; let local_group = local_group.group(); if new_groups.contains(local_group) { continue; } - let owner = params.store.get_owner(&target_ns, local_group)?; + let owner = params.target.store.get_owner(&target_ns, local_group)?; if check_backup_owner(&owner, ¶ms.owner).is_err() { continue; } @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns( } } task_log!(worker, "delete vanished group '{local_group}'",); - match params.store.remove_backup_group(&target_ns, local_group) { + match params + .target + .store + .remove_backup_group(&target_ns, local_group) + { Ok(true) => {} Ok(false) => { task_log!( -- 2.39.2