From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 4FBDB94340 for ; Thu, 23 Feb 2023 13:55:58 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 395D524828 for ; Thu, 23 Feb 2023 13:55:58 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 23 Feb 2023 13:55:54 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id D903448286 for ; Thu, 23 Feb 2023 13:55:53 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Thu, 23 Feb 2023 13:55:40 +0100 Message-Id: <20230223125540.1298442-6-h.laimer@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20230223125540.1298442-1-h.laimer@proxmox.com> References: <20230223125540.1298442-1-h.laimer@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.017 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [datastore.rs, pull.rs] Subject: [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 23 Feb 2023 12:55:58 -0000 ... and rewrite pull logic. Signed-off-by: Hannes Laimer --- pbs-api-types/src/datastore.rs | 2 +- pbs-datastore/src/read_chunk.rs | 2 +- src/api2/pull.rs | 13 +- src/server/pull.rs | 1023 +++++++++++++++++++------------ 4 files changed, 648 insertions(+), 392 deletions(-) diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs index 72e8d1ee..9a692b08 100644 --- a/pbs-api-types/src/datastore.rs +++ b/pbs-api-types/src/datastore.rs @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup { /// Uniquely identify a Backup (relative to data store) /// /// We also call this a backup snaphost. -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct BackupDir { /// Backup group. diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs index c04a7431..29ee2d4c 100644 --- a/pbs-datastore/src/read_chunk.rs +++ b/pbs-datastore/src/read_chunk.rs @@ -14,7 +14,7 @@ pub trait ReadChunk { fn read_chunk(&self, digest: &[u8; 32]) -> Result, Error>; } -pub trait AsyncReadChunk: Send { +pub trait AsyncReadChunk: Send + Sync { /// Returns the encoded chunk data fn read_raw_chunk<'a>( &'a self, diff --git a/src/api2/pull.rs b/src/api2/pull.rs index bb8f6fe1..2966190c 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -121,8 +121,8 @@ pub fn do_sync_job( let sync_job2 = sync_job.clone(); let worker_future = async move { - let pull_params = PullParameters::try_from(&sync_job)?; - let client = pull_params.client().await?; + let mut pull_params = PullParameters::try_from(&sync_job)?; + pull_params.init_source(sync_job.limit).await?; task_log!(worker, "Starting datastore sync job '{}'", job_id); if let Some(event_str) = schedule { @@ -137,7 +137,7 @@ pub fn do_sync_job( ); if sync_job.remote.is_some() { - pull_store(&worker, &client, pull_params).await?; + pull_store(&worker, pull_params).await?; } else { match (sync_job.ns, sync_job.remote_ns) { (Some(target_ns), Some(source_ns)) @@ -280,7 +280,7 @@ async fn pull( delete, )?; - let pull_params = PullParameters::new( + let mut pull_params = PullParameters::new( &store, ns, remote.as_deref(), @@ -290,9 +290,8 @@ async fn pull( remove_vanished, max_depth, group_filter, - limit, )?; - let client = pull_params.client().await?; + pull_params.init_source(limit).await?; // fixme: set to_stdout to false? // FIXME: add namespace to worker id? @@ -310,7 +309,7 @@ async fn pull( remote_store, ); - let pull_future = pull_store(&worker, &client, pull_params); + let pull_future = pull_store(&worker, pull_params); (select! { success = pull_future.fuse() => success, abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, diff --git a/src/server/pull.rs b/src/server/pull.rs index 65eedf2c..d3be39da 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,28 +1,26 @@ //! Sync datastore from remote server use std::collections::{HashMap, HashSet}; -use std::io::{Seek, SeekFrom}; +use std::io::{Seek, SeekFrom, Write}; +use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use http::StatusCode; -use pbs_config::CachedUserInfo; -use serde_json::json; - +use proxmox_rest_server::WorkerTask; use proxmox_router::HttpError; use proxmox_sys::task_log; +use serde_json::json; use pbs_api_types::{ - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem, + print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, }; - -use pbs_client::{ - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, -}; +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; +use pbs_config::CachedUserInfo; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; use pbs_datastore::fixed_index::FixedIndexReader; @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; +use pbs_datastore::read_chunk::AsyncReadChunk; +use pbs_datastore::{ + check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress, +}; use pbs_tools::sha::sha256; -use proxmox_rest_server::WorkerTask; -use crate::backup::{check_ns_modification_privs, check_ns_privs}; +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups}; use crate::tools::parallel_handler::ParallelHandler; /// Parameters for a pull operation. pub(crate) struct PullParameters { - /// Remote that is pulled from - remote: Remote, - /// Full specification of remote datastore - source: BackupRepository, - /// Local store that is pulled into - store: Arc, - /// Remote namespace - remote_ns: BackupNamespace, - /// Local namespace (anchor) - ns: BackupNamespace, + /// Where data is pulled from + source: PullSource, + /// Where data should be pulled into + target: PullTarget, /// Owner of synced groups (needs to match local owner of pre-existing groups) owner: Authid, /// Whether to remove groups which exist locally, but not on the remote end @@ -57,70 +51,459 @@ pub(crate) struct PullParameters { max_depth: Option, /// Filters for reducing the pull scope group_filter: Option>, - /// Rate limits for all transfers from `remote` - limit: RateLimitConfig, +} + +pub(crate) enum PullSource { + Remote(RemoteSource), + Local(LocalSource), +} + +pub(crate) struct PullTarget { + store: Arc, + ns: BackupNamespace, +} + +pub(crate) struct LocalSource { + store: Arc, + ns: BackupNamespace, +} + +pub(crate) struct RemoteSource { + remote: Remote, + repo: BackupRepository, + ns: BackupNamespace, + client: Option, + backup_reader: HashMap>, +} + +impl PullSource { + pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> { + match self { + PullSource::Remote(source) => { + source.client.replace( + crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?, + ); + } + PullSource::Local(_) => {} + }; + Ok(()) + } + + async fn list_namespaces( + &self, + max_depth: &mut Option, + worker: &WorkerTask, + ) -> Result, Error> { + match &self { + PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await, + PullSource::Local(source) => ListNamespacesRecursive::new_max_depth( + source.store.clone(), + source.ns.clone(), + max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), + )? + .collect(), + } + } + + async fn list_groups( + &self, + namespace: &BackupNamespace, + owner: &Authid, + ) -> Result, Error> { + match &self { + PullSource::Remote(source) => { + let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store()); + + let args = if !namespace.is_root() { + Some(json!({ "ns": namespace.clone() })) + } else { + None + }; + + let client = source.get_client()?; + client.login().await?; + let mut result = client.get(&path, args).await.map_err(|err| { + format_err!("Failed to retrieve backup groups from remote - {}", err) + })?; + + Ok( + serde_json::from_value::>(result["data"].take()) + .map_err(|e| Error::from(e))? + .into_iter() + .map(|item| item.backup) + .collect::>(), + ) + } + PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs( + &source.store, + namespace.clone(), + MAX_NAMESPACE_DEPTH, + None, + None, + Some(owner), + )? + .filter_map(Result::ok) + .map(|backup_group| backup_group.group().clone()) + .collect::>()), + } + } + + async fn list_backup_dirs( + &self, + namespace: &BackupNamespace, + group: &pbs_api_types::BackupGroup, + worker: &WorkerTask, + ) -> Result, Error> { + match &self { + PullSource::Remote(source) => { + let path = format!( + "api2/json/admin/datastore/{}/snapshots", + source.repo.store() + ); + + let mut args = json!({ + "backup-type": group.ty, + "backup-id": group.id, + }); + + if !source.ns.is_root() { + args["ns"] = serde_json::to_value(&source.ns)?; + } + + let client = source.get_client()?; + client.login().await?; + + let mut result = client.get(&path, Some(args)).await?; + let snapshot_list: Vec = + serde_json::from_value(result["data"].take())?; + Ok(snapshot_list + .into_iter() + .filter_map(|item: SnapshotListItem| { + let snapshot = item.backup; + // in-progress backups can't be synced + if item.size.is_none() { + task_log!( + worker, + "skipping snapshot {} - in-progress backup", + snapshot + ); + return None; + } + + Some(snapshot) + }) + .collect::>()) + } + PullSource::Local(source) => Ok(source + .store + .backup_group(namespace.clone(), group.clone()) + .iter_snapshots()? + .filter_map(Result::ok) + .map(|snapshot| snapshot.dir().to_owned()) + .collect::>()), + } + } + + /// Load file from source namespace and BackupDir into file + async fn load_file_into( + &mut self, + namespace: &BackupNamespace, + snapshot: &pbs_api_types::BackupDir, + filename: &str, + into: &PathBuf, + worker: &WorkerTask, + ) -> Result, Error> { + let mut tmp_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(into)?; + match self { + PullSource::Remote(ref mut source) => { + let client = source.get_client()?; + client.login().await?; + + let reader = if let Some(reader) = source.backup_reader.get(snapshot) { + reader.clone() + } else { + let backup_reader = BackupReader::start( + client, + None, + source.repo.store(), + namespace, + snapshot, + true, + ) + .await?; + source + .backup_reader + .insert(snapshot.clone(), backup_reader.clone()); + backup_reader + }; + + let download_result = reader.download(filename, &mut tmp_file).await; + + if let Err(err) = download_result { + match err.downcast_ref::() { + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + task_log!( + worker, + "skipping snapshot {} - vanished since start of sync", + snapshot, + ); + return Ok(None); + } + _ => { + bail!("HTTP error {code} - {message}"); + } + }, + None => { + return Err(err); + } + }; + }; + } + PullSource::Local(source) => { + let dir = source + .store + .backup_dir(namespace.clone(), snapshot.clone())?; + let mut from_path = dir.full_path(); + from_path.push(filename); + tmp_file.write_all(std::fs::read(from_path)?.as_slice())?; + } + } + + tmp_file.seek(SeekFrom::Start(0))?; + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) + } + + // Note: The client.log.blob is uploaded after the backup, so it is + // not mentioned in the manifest. + async fn try_download_client_log( + &self, + from_snapshot: &pbs_api_types::BackupDir, + to_path: &std::path::Path, + worker: &WorkerTask, + ) -> Result<(), Error> { + match &self { + PullSource::Remote(source) => { + let reader = source + .backup_reader + .get(from_snapshot) + .ok_or(format_err!("Can't download chunks without a BackupReader"))?; + let mut tmp_path = to_path.to_owned(); + tmp_path.set_extension("tmp"); + + let tmpfile = std::fs::OpenOptions::new() + .write(true) + .create(true) + .read(true) + .open(&tmp_path)?; + + // Note: be silent if there is no log - only log successful download + if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { + if let Err(err) = std::fs::rename(&tmp_path, to_path) { + bail!("Atomic rename file {:?} failed - {}", to_path, err); + } + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); + } + + Ok(()) + } + PullSource::Local(_) => Ok(()), + } + } + + fn get_chunk_reader( + &self, + snapshot: &pbs_api_types::BackupDir, + crypt_mode: CryptMode, + ) -> Result, Error> { + Ok(match &self { + PullSource::Remote(source) => { + if let Some(reader) = source.backup_reader.get(snapshot) { + Arc::new(RemoteChunkReader::new( + reader.clone(), + None, + crypt_mode, + HashMap::new(), + )) + } else { + bail!("No initialized BackupReader!") + } + } + PullSource::Local(source) => Arc::new(LocalChunkReader::new( + source.store.clone(), + None, + crypt_mode, + )), + }) + } + + fn get_ns(&self) -> BackupNamespace { + match &self { + PullSource::Remote(source) => source.ns.clone(), + PullSource::Local(source) => source.ns.clone(), + } + } + + fn print_store_and_ns(&self) -> String { + match &self { + PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns), + PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns), + } + } +} + +impl RemoteSource { + fn get_client(&self) -> Result<&HttpClient, Error> { + if let Some(client) = &self.client { + Ok(client) + } else { + bail!("RemoteSource not initialized") + } + } } impl PullParameters { /// Creates a new instance of `PullParameters`. - /// - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a - /// [BackupRepository] with `remote_store`. - #[allow(clippy::too_many_arguments)] pub(crate) fn new( store: &str, ns: BackupNamespace, - remote: &str, + remote: Option<&str>, remote_store: &str, remote_ns: BackupNamespace, owner: Authid, remove_vanished: Option, max_depth: Option, group_filter: Option>, - limit: RateLimitConfig, ) -> Result { - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; - if let Some(max_depth) = max_depth { ns.check_max_depth(max_depth)?; remote_ns.check_max_depth(max_depth)?; - } + }; + let remove_vanished = remove_vanished.unwrap_or(false); - let (remote_config, _digest) = pbs_config::remote::config()?; - let remote: Remote = remote_config.lookup("remote", remote)?; + let source: PullSource = if let Some(remote) = remote { + let (remote_config, _digest) = pbs_config::remote::config()?; + let remote: Remote = remote_config.lookup("remote", remote)?; - let remove_vanished = remove_vanished.unwrap_or(false); + let repo = BackupRepository::new( + Some(remote.config.auth_id.clone()), + Some(remote.config.host.clone()), + remote.config.port, + remote_store.to_string(), + ); - let source = BackupRepository::new( - Some(remote.config.auth_id.clone()), - Some(remote.config.host.clone()), - remote.config.port, - remote_store.to_string(), - ); + PullSource::Remote(RemoteSource { + remote, + repo, + ns: remote_ns.clone(), + client: None, + backup_reader: HashMap::new(), + }) + } else { + PullSource::Local(LocalSource { + store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?, + ns: remote_ns, + }) + }; + let target = PullTarget { + store: DataStore::lookup_datastore(store, Some(Operation::Write))?, + ns, + }; Ok(Self { - remote, - remote_ns, - ns, source, - store, + target, owner, remove_vanished, max_depth, group_filter, - limit, }) } - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. - pub async fn client(&self) -> Result { - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await + pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> { + self.source.init(limit).await + } + + pub(crate) fn skip_chunk_sync(&self) -> bool { + match &self.source { + PullSource::Local(source) => source.store.name() == self.target.store.name(), + PullSource::Remote(_) => false, + } + } + + pub(crate) fn get_target_ns(&self) -> Result { + let source_ns = self.source.get_ns(); + source_ns.map_prefix(&source_ns, &self.target.ns) } } +async fn list_remote_namespaces( + source: &RemoteSource, + max_depth: &mut Option, + worker: &WorkerTask, +) -> Result, Error> { + if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { + vec![source.ns.clone()]; + } + + let path = format!( + "api2/json/admin/datastore/{}/namespace", + source.repo.store() + ); + let mut data = json!({}); + if let Some(max_depth) = max_depth { + data["max-depth"] = json!(max_depth); + } + + if !source.ns.is_root() { + data["parent"] = json!(source.ns); + } + + let client = source.get_client()?; + client.login().await?; + + let mut result = match client.get(&path, Some(data)).await { + Ok(res) => res, + Err(err) => match err.downcast_ref::() { + Some(HttpError { code, message }) => match code { + &StatusCode::NOT_FOUND => { + if source.ns.is_root() && max_depth.is_none() { + task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); + task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); + max_depth.replace(0); + } else { + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") + } + + return Ok(vec![source.ns.clone()]); + } + _ => { + bail!("Querying namespaces failed - HTTP error {code} - {message}"); + } + }, + None => { + bail!("Querying namespaces failed - {err}"); + } + }, + }; + + let list: Vec = + serde_json::from_value::>(result["data"].take())? + .iter() + .map(|list_item| list_item.ns.clone()) + .collect(); + + Ok(list) +} + async fn pull_index_chunks( worker: &WorkerTask, - chunk_reader: RemoteChunkReader, + chunk_reader: Arc, target: Arc, index: I, downloaded_chunks: Arc>>, @@ -211,26 +594,6 @@ async fn pull_index_chunks( Ok(()) } -async fn download_manifest( - reader: &BackupReader, - filename: &std::path::Path, -) -> Result { - let mut tmp_manifest_file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .read(true) - .open(filename)?; - - reader - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) - .await?; - - tmp_manifest_file.seek(SeekFrom::Start(0))?; - - Ok(tmp_manifest_file) -} - fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { if size != info.size { bail!( @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err /// Pulls a single file referenced by a manifest. /// /// Pulling an archive consists of the following steps: -/// - Create tmp file for archive -/// - Download archive file into tmp file +/// - Load archive file into tmp file /// - Verify tmp file checksum /// - if archive is an index, pull referenced chunks /// - Rename tmp file into real path async fn pull_single_archive( worker: &WorkerTask, - reader: &BackupReader, - chunk_reader: &mut RemoteChunkReader, - snapshot: &pbs_datastore::BackupDir, + params: &mut PullParameters, + from_namespace: &BackupNamespace, + from_snapshot: &pbs_api_types::BackupDir, + to_snapshot: &pbs_datastore::BackupDir, archive_info: &FileInfo, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let archive_name = &archive_info.filename; - let mut path = snapshot.full_path(); + let mut path = to_snapshot.full_path(); path.push(archive_name); let mut tmp_path = path.clone(); @@ -273,13 +636,18 @@ async fn pull_single_archive( task_log!(worker, "sync archive {}", archive_name); - let mut tmpfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .read(true) - .open(&tmp_path)?; + params + .source + .load_file_into( + from_namespace, + from_snapshot, + archive_name, + &tmp_path, + worker, + ) + .await?; - reader.download(archive_name, &mut tmpfile).await?; + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; match archive_type(archive_name)? { ArchiveType::DynamicIndex => { @@ -289,14 +657,20 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + if params.skip_chunk_sync() { + task_log!(worker, "skipping chunk sync for same datatsore"); + } else { + pull_index_chunks( + worker, + params + .source + .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?, + params.target.store.clone(), + index, + downloaded_chunks, + ) + .await?; + } } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile).map_err(|err| { @@ -305,14 +679,20 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + if params.skip_chunk_sync() { + task_log!(worker, "skipping chunk sync for same datatsore"); + } else { + pull_index_chunks( + worker, + params + .source + .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?, + params.target.store.clone(), + index, + downloaded_chunks, + ) + .await?; + } } ArchiveType::Blob => { tmpfile.seek(SeekFrom::Start(0))?; @@ -326,33 +706,6 @@ async fn pull_single_archive( Ok(()) } -// Note: The client.log.blob is uploaded after the backup, so it is -// not mentioned in the manifest. -async fn try_client_log_download( - worker: &WorkerTask, - reader: Arc, - path: &std::path::Path, -) -> Result<(), Error> { - let mut tmp_path = path.to_owned(); - tmp_path.set_extension("tmp"); - - let tmpfile = std::fs::OpenOptions::new() - .write(true) - .create(true) - .read(true) - .open(&tmp_path)?; - - // Note: be silent if there is no log - only log successful download - if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { - if let Err(err) = std::fs::rename(&tmp_path, path) { - bail!("Atomic rename file {:?} failed - {}", path, err); - } - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); - } - - Ok(()) -} - /// Actual implementation of pulling a snapshot. /// /// Pulling a snapshot consists of the following steps: @@ -364,44 +717,37 @@ async fn try_client_log_download( /// - Download log if not already existing async fn pull_snapshot( worker: &WorkerTask, - reader: Arc, - snapshot: &pbs_datastore::BackupDir, + params: &mut PullParameters, + namespace: &BackupNamespace, + from_snapshot: &pbs_api_types::BackupDir, + to_snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let mut manifest_name = snapshot.full_path(); + let mut manifest_name = to_snapshot.full_path(); manifest_name.push(MANIFEST_BLOB_NAME); - let mut client_log_name = snapshot.full_path(); + let mut client_log_name = to_snapshot.full_path(); client_log_name.push(CLIENT_LOG_BLOB_NAME); let mut tmp_manifest_name = manifest_name.clone(); tmp_manifest_name.set_extension("tmp"); - let download_res = download_manifest(&reader, &tmp_manifest_name).await; - let mut tmp_manifest_file = match download_res { - Ok(manifest_file) => manifest_file, - Err(err) => { - match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - task_log!( - worker, - "skipping snapshot {} - vanished since start of sync", - snapshot.dir(), - ); - return Ok(()); - } - _ => { - bail!("HTTP error {code} - {message}"); - } - }, - None => { - return Err(err); - } - }; - } - }; - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; + let tmp_manifest_blob; + if let Some(data) = params + .source + .load_file_into( + namespace, + from_snapshot, + MANIFEST_BLOB_NAME, + &tmp_manifest_name, + worker, + ) + .await? + { + tmp_manifest_blob = data; + } else { + return Ok(()); + } if manifest_name.exists() { let manifest_blob = proxmox_lang::try_block!({ @@ -418,8 +764,11 @@ async fn pull_snapshot( if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; - } + params + .source + .try_download_client_log(from_snapshot, &client_log_name, worker) + .await?; + }; task_log!(worker, "no data changes"); let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(()); // nothing changed @@ -429,7 +778,7 @@ async fn pull_snapshot( let manifest = BackupManifest::try_from(tmp_manifest_blob)?; for item in manifest.files() { - let mut path = snapshot.full_path(); + let mut path = to_snapshot.full_path(); path.push(&item.filename); if path.exists() { @@ -467,18 +816,12 @@ async fn pull_snapshot( } } - let mut chunk_reader = RemoteChunkReader::new( - reader.clone(), - None, - item.chunk_crypt_mode(), - HashMap::new(), - ); - pull_single_archive( worker, - &reader, - &mut chunk_reader, - snapshot, + params, + namespace, + from_snapshot, + to_snapshot, item, downloaded_chunks.clone(), ) @@ -490,10 +833,12 @@ async fn pull_snapshot( } if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; - } - - snapshot + params + .source + .try_download_client_log(from_snapshot, &client_log_name, worker) + .await?; + }; + to_snapshot .cleanup_unreferenced_files(&manifest) .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; @@ -501,37 +846,53 @@ async fn pull_snapshot( } /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. -/// -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is -/// pointing to the local datastore and target namespace. async fn pull_snapshot_from( worker: &WorkerTask, - reader: Arc, - snapshot: &pbs_datastore::BackupDir, + params: &mut PullParameters, + namespace: &BackupNamespace, + from_snapshot: &pbs_api_types::BackupDir, + to_snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let (_path, is_new, _snap_lock) = snapshot + let (_path, is_new, _snap_lock) = to_snapshot .datastore() - .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; + .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?; if is_new { - task_log!(worker, "sync snapshot {}", snapshot.dir()); + task_log!(worker, "sync snapshot {}", to_snapshot.dir()); - if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { - if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( - snapshot.backup_ns(), - snapshot.as_ref(), + if let Err(err) = pull_snapshot( + worker, + params, + namespace, + from_snapshot, + to_snapshot, + downloaded_chunks, + ) + .await + { + if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir( + to_snapshot.backup_ns(), + to_snapshot.as_ref(), true, ) { task_log!(worker, "cleanup error - {}", cleanup_err); } return Err(err); } - task_log!(worker, "sync snapshot {} done", snapshot.dir()); + task_log!(worker, "sync snapshot {} done", to_snapshot.dir()); } else { - task_log!(worker, "re-sync snapshot {}", snapshot.dir()); - pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; - task_log!(worker, "re-sync snapshot {} done", snapshot.dir()); + task_log!(worker, "re-sync snapshot {}", to_snapshot.dir()); + pull_snapshot( + worker, + params, + namespace, + from_snapshot, + to_snapshot, + downloaded_chunks, + ) + .await?; + task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir()); } Ok(()) @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo { /// - Sort by snapshot time /// - Get last snapshot timestamp on local datastore /// - Iterate over list of snapshots -/// -- Recreate client/BackupReader /// -- pull snapshot, unless it's not finished yet or older than last local snapshot /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote /// @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo { /// - local group owner is already checked by pull_store async fn pull_group( worker: &WorkerTask, - client: &HttpClient, - params: &PullParameters, + params: &mut PullParameters, + source_namespace: &BackupNamespace, group: &pbs_api_types::BackupGroup, - remote_ns: BackupNamespace, progress: &mut StoreProgress, ) -> Result<(), Error> { - let path = format!( - "api2/json/admin/datastore/{}/snapshots", - params.source.store() - ); - - let mut args = json!({ - "backup-type": group.ty, - "backup-id": group.id, - }); - - if !remote_ns.is_root() { - args["ns"] = serde_json::to_value(&remote_ns)?; - } - - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; - - let mut result = client.get(&path, Some(args)).await?; - let mut list: Vec = serde_json::from_value(result["data"].take())?; - - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); - - client.login().await?; // make sure auth is complete - - let fingerprint = client.fingerprint(); - - let last_sync = params.store.last_successful_backup(&target_ns, group)?; - - let mut remote_snapshots = std::collections::HashSet::new(); - - // start with 65536 chunks (up to 256 GiB) - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); - - progress.group_snapshots = list.len() as u64; + let target_ns = params.get_target_ns()?; + let mut source_snapshots = HashSet::new(); + let last_sync = params + .target + .store + .last_successful_backup(&target_ns, group)?; let mut skip_info = SkipInfo { oldest: i64::MAX, newest: i64::MIN, count: 0, }; - for (pos, item) in list.into_iter().enumerate() { - let snapshot = item.backup; - - // in-progress backups can't be synced - if item.size.is_none() { - task_log!( - worker, - "skipping snapshot {} - in-progress backup", - snapshot - ); - continue; - } + let mut list: Vec = params + .source + .list_backup_dirs(source_namespace, group, worker) + .await? + .into_iter() + .filter(|dir| { + source_snapshots.insert(dir.time); + if let Some(last_sync_time) = last_sync { + if last_sync_time > dir.time { + skip_info.update(dir.time); + return false; + } + } + true + }) + .collect(); - remote_snapshots.insert(snapshot.time); + list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); - if let Some(last_sync_time) = last_sync { - if last_sync_time > snapshot.time { - skip_info.update(snapshot.time); - continue; - } - } + // start with 65536 chunks (up to 256 GiB) + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); - // get updated auth_info (new tickets) - let auth_info = client.login().await?; - - let options = - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()) - .rate_limit(params.limit.clone()); - - let new_client = HttpClient::new( - params.source.host(), - params.source.port(), - params.source.auth_id(), - options, - )?; - - let reader = BackupReader::start( - new_client, - None, - params.source.store(), - &remote_ns, - &snapshot, - true, - ) - .await?; + progress.group_snapshots = list.len() as u64; - let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; + for (pos, from_snapshot) in list.into_iter().enumerate() { + let to_snapshot = params + .target + .store + .backup_dir(params.target.ns.clone(), from_snapshot.clone())?; - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; + let result = pull_snapshot_from( + worker, + params, + source_namespace, + &from_snapshot, + &to_snapshot, + downloaded_chunks.clone(), + ) + .await; progress.done_snapshots = pos as u64 + 1; task_log!(worker, "percentage done: {}", progress); @@ -703,11 +1025,14 @@ async fn pull_group( } if params.remove_vanished { - let group = params.store.backup_group(target_ns.clone(), group.clone()); + let group = params + .target + .store + .backup_group(target_ns.clone(), group.clone()); let local_list = group.list_backups()?; for info in local_list { let snapshot = info.backup_dir; - if remote_snapshots.contains(&snapshot.backup_time()) { + if source_snapshots.contains(&snapshot.backup_time()) { continue; } if snapshot.is_protected() { @@ -720,6 +1045,7 @@ async fn pull_group( } task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); params + .target .store .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; } @@ -732,64 +1058,12 @@ async fn pull_group( Ok(()) } -// will modify params if switching to backwards mode for lack of NS support on remote end -async fn query_namespaces( - worker: &WorkerTask, - client: &HttpClient, - params: &mut PullParameters, -) -> Result, Error> { - let path = format!( - "api2/json/admin/datastore/{}/namespace", - params.source.store() - ); - let mut data = json!({}); - if let Some(max_depth) = params.max_depth { - data["max-depth"] = json!(max_depth); - } - - if !params.remote_ns.is_root() { - data["parent"] = json!(params.remote_ns); - } - - let mut result = match client.get(&path, Some(data)).await { - Ok(res) => res, - Err(err) => match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - if params.remote_ns.is_root() && params.max_depth.is_none() { - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); - params.max_depth = Some(0); - } else { - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") - } - - return Ok(vec![params.remote_ns.clone()]); - } - _ => { - bail!("Querying namespaces failed - HTTP error {code} - {message}"); - } - }, - None => { - bail!("Querying namespaces failed - {err}"); - } - }, - }; - - let mut list: Vec = serde_json::from_value(result["data"].take())?; - - // parents first - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); - - Ok(list.iter().map(|item| item.ns.clone()).collect()) -} - fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result { let mut created = false; - let store_ns_str = print_store_and_ns(params.store.name(), ns); + let store_ns_str = print_store_and_ns(params.target.store.name(), ns); - if !ns.is_root() && !params.store.namespace_path(ns).exists() { - check_ns_modification_privs(params.store.name(), ns, ¶ms.owner) + if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { + check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; let name = match ns.components().last() { @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< } }; - if let Err(err) = params.store.create_namespace(&ns.parent(), name) { + if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); } created = true; } check_ns_privs( - params.store.name(), + params.target.store.name(), ns, ¶ms.owner, PRIV_DATASTORE_BACKUP, @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< } fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result { - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner) + check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; - params.store.remove_namespace_recursive(local_ns, true) + params + .target + .store + .remove_namespace_recursive(local_ns, true) } fn check_and_remove_vanished_ns( @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns( // clamp like remote does so that we don't list more than we can ever have synced. let max_depth = params .max_depth - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth()); + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); let mut local_ns_list: Vec = params + .target .store - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))? + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? .filter(|ns| { let user_privs = - user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name())); + user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 }) .collect(); @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns( local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); for local_ns in local_ns_list { - if local_ns == params.ns { + if local_ns == params.target.ns { continue; } @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns( /// - access to sub-NS checked here pub(crate) async fn pull_store( worker: &WorkerTask, - client: &HttpClient, mut params: PullParameters, ) -> Result<(), Error> { // explicit create shared lock to prevent GC on newly created chunks - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; + let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; let mut errors = false; let old_max_depth = params.max_depth; - let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) { - vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces! - } else { - query_namespaces(worker, client, &mut params).await? - }; + let mut namespaces = params + .source + .list_namespaces(&mut params.max_depth, worker) + .await?; errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode + namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len())); let (mut groups, mut snapshots) = (0, 0); let mut synced_ns = HashSet::with_capacity(namespaces.len()); for namespace in namespaces { - let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace); + let source_store_ns_str = params.source.print_store_and_ns(); - let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?; - let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns); + let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; + let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); task_log!(worker, "----"); task_log!( @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store( } } - match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await { + match pull_ns(worker, &namespace, &mut params).await { Ok((ns_progress, ns_errors)) => { errors |= ns_errors; @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store( task_log!( worker, "Encountered errors while syncing namespace {} - {}", - namespace, + &namespace, err, ); } @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store( /// - owner check for vanished groups done here pub(crate) async fn pull_ns( worker: &WorkerTask, - client: &HttpClient, - params: &PullParameters, - source_ns: BackupNamespace, - target_ns: BackupNamespace, + namespace: &BackupNamespace, + params: &mut PullParameters, ) -> Result<(StoreProgress, bool), Error> { - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); - - let args = if !source_ns.is_root() { - Some(json!({ - "ns": source_ns, - })) - } else { - None - }; - - let mut result = client - .get(&path, args) - .await - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; - - let mut list: Vec = serde_json::from_value(result["data"].take())?; + let mut list: Vec = + params.source.list_groups(namespace, ¶ms.owner).await?; let total_count = list.len(); list.sort_unstable_by(|a, b| { - let type_order = a.backup.ty.cmp(&b.backup.ty); + let type_order = a.ty.cmp(&b.ty); if type_order == std::cmp::Ordering::Equal { - a.backup.id.cmp(&b.backup.id) + a.id.cmp(&b.id) } else { type_order } @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns( filters.iter().any(|filter| group.matches(filter)) }; - // Get groups with target NS set - let list: Vec = list.into_iter().map(|item| item.backup).collect(); - let list = if let Some(ref group_filter) = ¶ms.group_filter { let unfiltered_count = list.len(); let list: Vec = list @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns( let mut progress = StoreProgress::new(list.len() as u64); + let target_ns = params.get_target_ns()?; for (done, group) in list.into_iter().enumerate() { progress.done_groups = done as u64; progress.done_snapshots = 0; @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns( let (owner, _lock_guard) = match params + .target .store .create_locked_backup_group(&target_ns, &group, ¶ms.owner) { @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns( err ); errors = true; // do not stop here, instead continue + task_log!(worker, "create_locked_backup_group failed"); continue; } }; @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns( owner ); errors = true; // do not stop here, instead continue - } else if let Err(err) = pull_group( - worker, - client, - params, - &group, - source_ns.clone(), - &mut progress, - ) - .await + } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await { task_log!(worker, "sync group {} failed - {}", &group, err,); errors = true; // do not stop here, instead continue @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns( if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ - for local_group in params.store.iter_backup_groups(target_ns.clone())? { + for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { let local_group = local_group?; let local_group = local_group.group(); if new_groups.contains(local_group) { continue; } - let owner = params.store.get_owner(&target_ns, local_group)?; + let owner = params.target.store.get_owner(&target_ns, local_group)?; if check_backup_owner(&owner, ¶ms.owner).is_err() { continue; } @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns( } } task_log!(worker, "delete vanished group '{local_group}'",); - match params.store.remove_backup_group(&target_ns, local_group) { + match params + .target + .store + .remove_backup_group(&target_ns, local_group) + { Ok(true) => {} Ok(false) => { task_log!( -- 2.30.2