From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 74769D5AA for ; Thu, 21 Sep 2023 13:11:12 +0200 (CEST) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 55B901EBAB for ; Thu, 21 Sep 2023 13:10:42 +0200 (CEST) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Thu, 21 Sep 2023 13:10:39 +0200 (CEST) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 1545648654 for ; Thu, 21 Sep 2023 13:10:39 +0200 (CEST) Message-ID: Date: Thu, 21 Sep 2023 13:10:34 +0200 MIME-Version: 1.0 User-Agent: Mozilla Thunderbird From: Lukas Wagner To: Proxmox Backup Server development discussion , Hannes Laimer References: <20230808121344.199500-1-h.laimer@proxmox.com> <20230808121344.199500-6-h.laimer@proxmox.com> Content-Language: de-AT, en-US In-Reply-To: <20230808121344.199500-6-h.laimer@proxmox.com> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit X-SPAM-LEVEL: Spam detection results: 0 AWL -0.081 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% DMARC_MISSING 0.1 Missing DMARC policy KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Thu, 21 Sep 2023 11:11:12 -0000 Some of the changed lines seem to be overly long (>100 chars), I've noted some of the places, but probably did not catch everything. On 8/8/23 14:13, Hannes Laimer wrote: > ... making the pull logic independent from the actual source > using two traits. > > Signed-off-by: Hannes Laimer > --- > Cargo.toml | 2 + > pbs-datastore/src/read_chunk.rs | 2 +- > src/api2/config/remote.rs | 14 +- > src/api2/pull.rs | 31 +- > src/server/pull.rs | 943 +++++++++++++++++++------------- > 5 files changed, 570 insertions(+), 422 deletions(-) > > diff --git a/Cargo.toml b/Cargo.toml > index 4d34f8a1..74cb68e0 100644 > --- a/Cargo.toml > +++ b/Cargo.toml > @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" } > > # regular crates > anyhow = "1.0" > +async-trait = "0.1.56" > apt-pkg-native = "0.3.2" > base64 = "0.13" > bitflags = "1.2.1" > @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] } > > [dependencies] > anyhow.workspace = true > +async-trait.workspace = true > apt-pkg-native.workspace = true > base64.workspace = true > bitflags.workspace = true > diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs > index c04a7431..29ee2d4c 100644 > --- a/pbs-datastore/src/read_chunk.rs > +++ b/pbs-datastore/src/read_chunk.rs > @@ -14,7 +14,7 @@ pub trait ReadChunk { > fn read_chunk(&self, digest: &[u8; 32]) -> Result, Error>; > } > > -pub trait AsyncReadChunk: Send { > +pub trait AsyncReadChunk: Send + Sync { > /// Returns the encoded chunk data > fn read_raw_chunk<'a>( > &'a self, > diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs > index 307cf3cd..2511c5d5 100644 > --- a/src/api2/config/remote.rs > +++ b/src/api2/config/remote.rs > @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option) -> Result<(), Error> > Ok(()) > } > > -/// Helper to get client for remote.cfg entry > -pub async fn remote_client( > +/// Helper to get client for remote.cfg entry without login, just config > +pub fn remote_client_config( > remote: &Remote, > limit: Option, > ) -> Result { > @@ -320,6 +320,16 @@ pub async fn remote_client( > &remote.config.auth_id, > options, > )?; > + > + Ok(client) > +} > + > +/// Helper to get client for remote.cfg entry > +pub async fn remote_client( > + remote: &Remote, > + limit: Option, > +) -> Result { > + let client = remote_client_config(remote, limit)?; > let _auth_info = client > .login() // make sure we can auth > .await > diff --git a/src/api2/pull.rs b/src/api2/pull.rs > index 664ecce5..e36a5b14 100644 > --- a/src/api2/pull.rs > +++ b/src/api2/pull.rs > @@ -8,7 +8,7 @@ use proxmox_sys::task_log; > > use pbs_api_types::{ > Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, > - GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, > + GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP, > PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, > TRANSFER_LAST_SCHEMA, > }; > @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters { > PullParameters::new( > &sync_job.store, > sync_job.ns.clone().unwrap_or_default(), > - sync_job.remote.as_deref().unwrap_or("local"), > + sync_job.remote.as_deref(), > &sync_job.remote_store, > sync_job.remote_ns.clone().unwrap_or_default(), > sync_job > @@ -124,7 +124,6 @@ pub fn do_sync_job( > > let worker_future = async move { > let pull_params = PullParameters::try_from(&sync_job)?; > - let client = pull_params.client().await?; > > task_log!(worker, "Starting datastore sync job '{}'", job_id); > if let Some(event_str) = schedule { > @@ -138,24 +137,7 @@ pub fn do_sync_job( > sync_job.remote_store, > ); > > - if sync_job.remote.is_some() { > - pull_store(&worker, &client, pull_params).await?; > - } else { > - if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) { > - if target_ns.path().starts_with(source_ns.path()) > - && sync_job.store == sync_job.remote_store > - && sync_job.max_depth.map_or(true, |sync_depth| { > - target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH > - }) { > - task_log!( > - worker, > - "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping" > - ); > - } > - } else { > - pull_store(&worker, &client, pull_params).await?; > - } > - } > + pull_store(&worker, pull_params).await?; > > task_log!(worker, "sync job '{}' end", &job_id); > > @@ -284,7 +266,7 @@ async fn pull( > let pull_params = PullParameters::new( > &store, > ns, > - remote.as_deref().unwrap_or("local"), > + remote.as_deref(), > &remote_store, > remote_ns.unwrap_or_default(), > auth_id.clone(), > @@ -294,7 +276,6 @@ async fn pull( > limit, > transfer_last, > )?; > - let client = pull_params.client().await?; > > // fixme: set to_stdout to false? > // FIXME: add namespace to worker id? > @@ -312,7 +293,7 @@ async fn pull( > remote_store, > ); > > - let pull_future = pull_store(&worker, &client, pull_params); > + let pull_future = pull_store(&worker, pull_params); > (select! { > success = pull_future.fuse() => success, > abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort, > @@ -327,4 +308,4 @@ async fn pull( > Ok(upid_str) > } > > -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL); > \ No newline at end of file > +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL); > diff --git a/src/server/pull.rs b/src/server/pull.rs > index e55452d1..e1a27a8c 100644 > --- a/src/server/pull.rs > +++ b/src/server/pull.rs > @@ -1,28 +1,26 @@ > //! Sync datastore from remote server > > use std::collections::{HashMap, HashSet}; > -use std::io::{Seek, SeekFrom}; > +use std::io::Seek; > +use std::path::Path; > use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::SystemTime; > > use anyhow::{bail, format_err, Error}; > use http::StatusCode; > -use pbs_config::CachedUserInfo; > -use serde_json::json; > - > +use proxmox_rest_server::WorkerTask; > use proxmox_router::HttpError; > -use proxmox_sys::task_log; > +use proxmox_sys::{task_log, task_warn}; > +use serde_json::json; > > use pbs_api_types::{ > - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem, > - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, > + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter, > + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, > PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, > }; > - > -use pbs_client::{ > - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, > -}; > +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader}; > +use pbs_config::CachedUserInfo; > use pbs_datastore::data_blob::DataBlob; > use pbs_datastore::dynamic_index::DynamicIndexReader; > use pbs_datastore::fixed_index::FixedIndexReader; > @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile; > use pbs_datastore::manifest::{ > archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, > }; > +use pbs_datastore::read_chunk::AsyncReadChunk; > use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; > use pbs_tools::sha::sha256; > -use proxmox_rest_server::WorkerTask; > > use crate::backup::{check_ns_modification_privs, check_ns_privs}; > use crate::tools::parallel_handler::ParallelHandler; > > -/// Parameters for a pull operation. > -pub(crate) struct PullParameters { > - /// Remote that is pulled from > - remote: Remote, > - /// Full specification of remote datastore > - source: BackupRepository, > - /// Local store that is pulled into > +struct RemoteReader { > + backup_reader: Arc, > + dir: BackupDir, > +} > + > +pub(crate) struct PullTarget { > store: Arc, > - /// Remote namespace > - remote_ns: BackupNamespace, > - /// Local namespace (anchor) > ns: BackupNamespace, > +} > + > +pub(crate) struct RemoteSource { > + repo: BackupRepository, > + ns: BackupNamespace, > + client: HttpClient, > +} > + > +#[async_trait::async_trait] > +/// `PullSource` is a trait that provides an interface for pulling data/information from a source. > +/// The trait includes methods for listing namespaces, groups, and backup directories, > +/// as well as retrieving a reader for reading data from the source > +trait PullSource: Send + Sync { > + /// Lists namespaces from the source. > + async fn list_namespaces( > + &self, > + max_depth: &mut Option, > + worker: &WorkerTask, > + ) -> Result, Error>; > + > + /// Lists groups within a specific namespace from the source. > + async fn list_groups( > + &self, > + namespace: &BackupNamespace, > + owner: &Authid, > + ) -> Result, Error>; > + > + /// Lists backup directories for a specific group within a specific namespace from the source. > + async fn list_backup_dirs( > + &self, > + namespace: &BackupNamespace, > + group: &BackupGroup, > + worker: &WorkerTask, > + ) -> Result, Error>; > + fn get_ns(&self) -> BackupNamespace; > + fn print_store_and_ns(&self) -> String; > + > + /// Returns a reader for reading data from a specific backup directory. > + async fn reader( > + &self, > + ns: &BackupNamespace, > + dir: &BackupDir, > + ) -> Result, Error>; > +} > + > +#[async_trait::async_trait] > +impl PullSource for RemoteSource { > + async fn list_namespaces( > + &self, > + max_depth: &mut Option, > + worker: &WorkerTask, > + ) -> Result, Error> { > + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) { > + vec![self.ns.clone()]; > + } > + > + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store()); > + let mut data = json!({}); > + if let Some(max_depth) = max_depth { > + data["max-depth"] = json!(max_depth); > + } > + > + if !self.ns.is_root() { > + data["parent"] = json!(self.ns); > + } > + self.client.login().await?; > + > + let mut result = match self.client.get(&path, Some(data)).await { > + Ok(res) => res, > + Err(err) => match err.downcast_ref::() { > + Some(HttpError { code, message }) => match code { > + &StatusCode::NOT_FOUND => { > + if self.ns.is_root() && max_depth.is_none() { > + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); > + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); These lines exceed our 100 character limit. > + max_depth.replace(0); > + } else { > + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") > + } > + > + return Ok(vec![self.ns.clone()]); > + } > + _ => { > + bail!("Querying namespaces failed - HTTP error {code} - {message}"); > + } > + }, > + None => { > + bail!("Querying namespaces failed - {err}"); > + } > + }, > + }; > + > + let list: Vec = > + serde_json::from_value::>(result["data"].take())? > + .iter() > + .map(|list_item| list_item.ns.clone()) > + .collect(); > + > + Ok(list) > + } > + > + async fn list_groups( > + &self, > + namespace: &BackupNamespace, > + _owner: &Authid, > + ) -> Result, Error> { > + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store()); > + > + let args = if !namespace.is_root() { > + Some(json!({ "ns": namespace.clone() })) > + } else { > + None > + }; > + > + self.client.login().await?; > + let mut result = > + self.client.get(&path, args).await.map_err(|err| { > + format_err!("Failed to retrieve backup groups from remote - {}", err) > + })?; > + > + Ok( > + serde_json::from_value::>(result["data"].take()) > + .map_err(Error::from)? > + .into_iter() > + .map(|item| item.backup) > + .collect::>(), > + ) > + } > + > + async fn list_backup_dirs( > + &self, > + _namespace: &BackupNamespace, > + group: &BackupGroup, > + worker: &WorkerTask, > + ) -> Result, Error> { > + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store()); > + > + let mut args = json!({ > + "backup-type": group.ty, > + "backup-id": group.id, > + }); > + > + if !self.ns.is_root() { > + args["ns"] = serde_json::to_value(&self.ns)?; > + } > + > + self.client.login().await?; > + > + let mut result = self.client.get(&path, Some(args)).await?; > + let snapshot_list: Vec = serde_json::from_value(result["data"].take())?; > + Ok(snapshot_list > + .into_iter() > + .filter_map(|item: SnapshotListItem| { > + let snapshot = item.backup; > + // in-progress backups can't be synced > + if item.size.is_none() { > + task_log!( > + worker, > + "skipping snapshot {} - in-progress backup", > + snapshot > + ); > + return None; > + } > + > + Some(snapshot) > + }) > + .collect::>()) > + } > + > + fn get_ns(&self) -> BackupNamespace { > + self.ns.clone() > + } > + > + fn print_store_and_ns(&self) -> String { > + print_store_and_ns(self.repo.store(), &self.ns) > + } > + > + async fn reader( > + &self, > + ns: &BackupNamespace, > + dir: &BackupDir, > + ) -> Result, Error> { > + let backup_reader = > + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?; > + Ok(Arc::new(RemoteReader { > + backup_reader, > + dir: dir.clone(), > + })) > + } > +} > + > +#[async_trait::async_trait] > +/// `PullReader` is a trait that provides an interface for reading data from a source. > +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped. Long line again > +trait PullReader: Send + Sync { > + /// Returns a chunk reader with the specified encryption mode. > + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc; > + > + /// Asynchronously loads a file from the source into a local file. > + /// `filename` is the name of the file to load from the source. > + /// `into` is the path of the local file to load the source file into. > + async fn load_file_into( > + &self, > + filename: &str, > + into: &Path, > + worker: &WorkerTask, > + ) -> Result, Error>; > + > + /// Tries to download the client log from the source and save it into a local file. > + async fn try_download_client_log( > + &self, > + to_path: &Path, > + worker: &WorkerTask, > + ) -> Result<(), Error>; > + > + fn skip_chunk_sync(&self, target_store_name: &str) -> bool; > +} > + > +#[async_trait::async_trait] > +impl PullReader for RemoteReader { > + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc { > + Arc::new(RemoteChunkReader::new( > + self.backup_reader.clone(), > + None, > + crypt_mode, > + HashMap::new(), > + )) > + } > + > + async fn load_file_into( > + &self, > + filename: &str, > + into: &Path, > + worker: &WorkerTask, > + ) -> Result, Error> { > + let mut tmp_file = std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .truncate(true) > + .read(true) > + .open(into)?; > + let download_result = self.backup_reader.download(filename, &mut tmp_file).await; > + if let Err(err) = download_result { > + match err.downcast_ref::() { > + Some(HttpError { code, message }) => match *code { > + StatusCode::NOT_FOUND => { > + task_log!( > + worker, > + "skipping snapshot {} - vanished since start of sync", > + &self.dir, > + ); > + return Ok(None); > + } > + _ => { > + bail!("HTTP error {code} - {message}"); > + } > + }, > + None => { > + return Err(err); > + } > + }; > + }; > + tmp_file.rewind()?; > + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) > + } > + > + async fn try_download_client_log( > + &self, > + to_path: &Path, > + worker: &WorkerTask, > + ) -> Result<(), Error> { > + let mut tmp_path = to_path.to_owned(); > + tmp_path.set_extension("tmp"); > + > + let tmpfile = std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .read(true) > + .open(&tmp_path)?; > + > + // Note: be silent if there is no log - only log successful download > + if let Ok(()) = self > + .backup_reader > + .download(CLIENT_LOG_BLOB_NAME, tmpfile) > + .await > + { > + if let Err(err) = std::fs::rename(&tmp_path, to_path) { > + bail!("Atomic rename file {:?} failed - {}", to_path, err); > + } > + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); > + } > + > + Ok(()) > + } > + > + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool { > + false > + } > +} > + > +/// Parameters for a pull operation. > +pub(crate) struct PullParameters { > + /// Where data is pulled from > + source: Arc, > + /// Where data should be pulled into > + target: PullTarget, > /// Owner of synced groups (needs to match local owner of pre-existing groups) > owner: Authid, > /// Whether to remove groups which exist locally, but not on the remote end > @@ -57,22 +357,16 @@ pub(crate) struct PullParameters { > max_depth: Option, > /// Filters for reducing the pull scope > group_filter: Option>, > - /// Rate limits for all transfers from `remote` > - limit: RateLimitConfig, > /// How many snapshots should be transferred at most (taking the newest N snapshots) > transfer_last: Option, > } > > impl PullParameters { > /// Creates a new instance of `PullParameters`. > - /// > - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a > - /// [BackupRepository] with `remote_store`. > - #[allow(clippy::too_many_arguments)] > pub(crate) fn new( > store: &str, > ns: BackupNamespace, > - remote: &str, > + remote: Option<&str>, > remote_store: &str, > remote_ns: BackupNamespace, > owner: Authid, > @@ -82,49 +376,56 @@ impl PullParameters { > limit: RateLimitConfig, > transfer_last: Option, > ) -> Result { > - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; > - > if let Some(max_depth) = max_depth { > ns.check_max_depth(max_depth)?; > remote_ns.check_max_depth(max_depth)?; > - } > - > - let (remote_config, _digest) = pbs_config::remote::config()?; > - let remote: Remote = remote_config.lookup("remote", remote)?; > - > + }; > let remove_vanished = remove_vanished.unwrap_or(false); > > - let source = BackupRepository::new( > - Some(remote.config.auth_id.clone()), > - Some(remote.config.host.clone()), > - remote.config.port, > - remote_store.to_string(), > - ); > + let source: Arc = if let Some(remote) = remote { > + let (remote_config, _digest) = pbs_config::remote::config()?; > + let remote: Remote = remote_config.lookup("remote", remote)?; > > - Ok(Self { > - remote, > - remote_ns, > + let repo = BackupRepository::new( > + Some(remote.config.auth_id.clone()), > + Some(remote.config.host.clone()), > + remote.config.port, > + remote_store.to_string(), > + ); > + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?; > + Arc::new(RemoteSource { > + repo, > + ns: remote_ns, > + client, > + }) > + } else { > + bail!("local sync not implemented yet") > + }; > + let target = PullTarget { > + store: DataStore::lookup_datastore(store, Some(Operation::Write))?, > ns, > + }; > + > + Ok(Self { > source, > - store, > + target, > owner, > remove_vanished, > max_depth, > group_filter, > - limit, > transfer_last, > }) > } > > - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. > - pub async fn client(&self) -> Result { > - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await > + pub(crate) fn get_target_ns(&self) -> Result { > + let source_ns = self.source.get_ns(); > + source_ns.map_prefix(&source_ns, &self.target.ns) > } > } > > async fn pull_index_chunks( > worker: &WorkerTask, > - chunk_reader: RemoteChunkReader, > + chunk_reader: Arc, > target: Arc, > index: I, > downloaded_chunks: Arc>>, > @@ -215,26 +516,6 @@ async fn pull_index_chunks( > Ok(()) > } > > -async fn download_manifest( > - reader: &BackupReader, > - filename: &std::path::Path, > -) -> Result { > - let mut tmp_manifest_file = std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .truncate(true) > - .read(true) > - .open(filename)?; > - > - reader > - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) > - .await?; > - > - tmp_manifest_file.seek(SeekFrom::Start(0))?; > - > - Ok(tmp_manifest_file) > -} > - > fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> { > if size != info.size { > bail!( > @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err > /// Pulls a single file referenced by a manifest. > /// > /// Pulling an archive consists of the following steps: > -/// - Create tmp file for archive > -/// - Download archive file into tmp file > -/// - Verify tmp file checksum > +/// - Load archive file into tmp file > +/// -- Load file into tmp file > +/// -- Verify tmp file checksum > /// - if archive is an index, pull referenced chunks > /// - Rename tmp file into real path > -async fn pull_single_archive( > - worker: &WorkerTask, > - reader: &BackupReader, > - chunk_reader: &mut RemoteChunkReader, > - snapshot: &pbs_datastore::BackupDir, > - archive_info: &FileInfo, > +async fn pull_single_archive<'a>( > + worker: &'a WorkerTask, > + reader: Arc, > + snapshot: &'a pbs_datastore::BackupDir, > + archive_info: &'a FileInfo, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > let archive_name = &archive_info.filename; > @@ -277,13 +557,11 @@ async fn pull_single_archive( > > task_log!(worker, "sync archive {}", archive_name); > > - let mut tmpfile = std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .read(true) > - .open(&tmp_path)?; > + reader > + .load_file_into(archive_name, &tmp_path, worker) > + .await?; > > - reader.download(archive_name, &mut tmpfile).await?; > + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?; > > match archive_type(archive_name)? { > ArchiveType::DynamicIndex => { > @@ -293,14 +571,18 @@ async fn pull_single_archive( > let (csum, size) = index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if reader.skip_chunk_sync(snapshot.datastore().name()) { > + task_log!(worker, "skipping chunk sync for same datatsore"); > + } else { > + pull_index_chunks( > + worker, > + reader.chunk_reader(archive_info.crypt_mode), > + snapshot.datastore().clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::FixedIndex => { > let index = FixedIndexReader::new(tmpfile).map_err(|err| { > @@ -309,17 +591,21 @@ async fn pull_single_archive( > let (csum, size) = index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if reader.skip_chunk_sync(snapshot.datastore().name()) { > + task_log!(worker, "skipping chunk sync for same datatsore"); > + } else { > + pull_index_chunks( > + worker, > + reader.chunk_reader(archive_info.crypt_mode), > + snapshot.datastore().clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::Blob => { > - tmpfile.seek(SeekFrom::Start(0))?; > + tmpfile.rewind()?; > let (csum, size) = sha256(&mut tmpfile)?; > verify_archive(archive_info, &csum, size)?; > } > @@ -330,33 +616,6 @@ async fn pull_single_archive( > Ok(()) > } > > -// Note: The client.log.blob is uploaded after the backup, so it is > -// not mentioned in the manifest. > -async fn try_client_log_download( > - worker: &WorkerTask, > - reader: Arc, > - path: &std::path::Path, > -) -> Result<(), Error> { > - let mut tmp_path = path.to_owned(); > - tmp_path.set_extension("tmp"); > - > - let tmpfile = std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .read(true) > - .open(&tmp_path)?; > - > - // Note: be silent if there is no log - only log successful download > - if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await { > - if let Err(err) = std::fs::rename(&tmp_path, path) { > - bail!("Atomic rename file {:?} failed - {}", path, err); > - } > - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME); > - } > - > - Ok(()) > -} > - > /// Actual implementation of pulling a snapshot. > /// > /// Pulling a snapshot consists of the following steps: > @@ -366,10 +625,10 @@ async fn try_client_log_download( > /// -- if file already exists, verify contents > /// -- if not, pull it from the remote > /// - Download log if not already existing > -async fn pull_snapshot( > - worker: &WorkerTask, > - reader: Arc, > - snapshot: &pbs_datastore::BackupDir, > +async fn pull_snapshot<'a>( > + worker: &'a WorkerTask, > + reader: Arc, > + snapshot: &'a pbs_datastore::BackupDir, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > let mut manifest_name = snapshot.full_path(); > @@ -380,32 +639,15 @@ async fn pull_snapshot( > > let mut tmp_manifest_name = manifest_name.clone(); > tmp_manifest_name.set_extension("tmp"); > - > - let download_res = download_manifest(&reader, &tmp_manifest_name).await; > - let mut tmp_manifest_file = match download_res { > - Ok(manifest_file) => manifest_file, > - Err(err) => { > - match err.downcast_ref::() { > - Some(HttpError { code, message }) => match *code { > - StatusCode::NOT_FOUND => { > - task_log!( > - worker, > - "skipping snapshot {} - vanished since start of sync", > - snapshot.dir(), > - ); > - return Ok(()); > - } > - _ => { > - bail!("HTTP error {code} - {message}"); > - } > - }, > - None => { > - return Err(err); > - } > - }; > - } > - }; > - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; > + let tmp_manifest_blob; > + if let Some(data) = reader > + .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker) > + .await? > + { > + tmp_manifest_blob = data; > + } else { > + return Ok(()); > + } > > if manifest_name.exists() { > let manifest_blob = proxmox_lang::try_block!({ > @@ -422,8 +664,10 @@ async fn pull_snapshot( > > if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { > if !client_log_name.exists() { > - try_client_log_download(worker, reader, &client_log_name).await?; > - } > + reader > + .try_download_client_log(&client_log_name, worker) > + .await?; > + }; > task_log!(worker, "no data changes"); > let _ = std::fs::remove_file(&tmp_manifest_name); > return Ok(()); // nothing changed > @@ -471,17 +715,9 @@ async fn pull_snapshot( > } > } > > - let mut chunk_reader = RemoteChunkReader::new( > - reader.clone(), > - None, > - item.chunk_crypt_mode(), > - HashMap::new(), > - ); > - > pull_single_archive( > worker, > - &reader, > - &mut chunk_reader, > + reader.clone(), > snapshot, > item, > downloaded_chunks.clone(), > @@ -494,9 +730,10 @@ async fn pull_snapshot( > } > > if !client_log_name.exists() { > - try_client_log_download(worker, reader, &client_log_name).await?; > - } > - > + reader > + .try_download_client_log(&client_log_name, worker) > + .await?; > + }; > snapshot > .cleanup_unreferenced_files(&manifest) > .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; > @@ -506,12 +743,12 @@ async fn pull_snapshot( > > /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. > /// > -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is > -/// pointing to the local datastore and target namespace. > -async fn pull_snapshot_from( > - worker: &WorkerTask, > - reader: Arc, > - snapshot: &pbs_datastore::BackupDir, > +/// The `reader` is configured to read from the source backup directory, while the > +/// `snapshot` is pointing to the local datastore and target namespace. > +async fn pull_snapshot_from<'a>( > + worker: &'a WorkerTask, > + reader: Arc, > + snapshot: &'a pbs_datastore::BackupDir, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > let (_path, is_new, _snap_lock) = snapshot > @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo { > /// - Sort by snapshot time > /// - Get last snapshot timestamp on local datastore > /// - Iterate over list of snapshots > -/// -- Recreate client/BackupReader > /// -- pull snapshot, unless it's not finished yet or older than last local snapshot > /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote > /// > -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the > +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the > /// remote when querying snapshots. This allows us to interact with old remotes that don't have > /// namespace support yet. > /// > @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo { > /// - local group owner is already checked by pull_store > async fn pull_group( > worker: &WorkerTask, > - client: &HttpClient, > params: &PullParameters, > - group: &pbs_api_types::BackupGroup, > - remote_ns: BackupNamespace, > + source_namespace: &BackupNamespace, > + group: &BackupGroup, > progress: &mut StoreProgress, > ) -> Result<(), Error> { > - task_log!(worker, "sync group {}", group); > - > - let path = format!( > - "api2/json/admin/datastore/{}/snapshots", > - params.source.store() > - ); > - > - let mut args = json!({ > - "backup-type": group.ty, > - "backup-id": group.id, > - }); > - > - if !remote_ns.is_root() { > - args["ns"] = serde_json::to_value(&remote_ns)?; > - } > - > - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; > - > - let mut result = client.get(&path, Some(args)).await?; > - let mut list: Vec = serde_json::from_value(result["data"].take())?; > - > - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); > - > - client.login().await?; // make sure auth is complete > - > - let fingerprint = client.fingerprint(); > - > - let last_sync = params.store.last_successful_backup(&target_ns, group)?; > - let last_sync_time = last_sync.unwrap_or(i64::MIN); > - > - let mut remote_snapshots = std::collections::HashSet::new(); > - > - // start with 65536 chunks (up to 256 GiB) > - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); > - > - progress.group_snapshots = list.len() as u64; > - > let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced); > let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast); > > - let total_amount = list.len(); > + let mut raw_list: Vec = params > + .source > + .list_backup_dirs(source_namespace, group, worker) > + .await?; > + raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); > + > + let total_amount = raw_list.len(); > > let cutoff = params > .transfer_last > .map(|count| total_amount.saturating_sub(count)) > .unwrap_or_default(); > > - for (pos, item) in list.into_iter().enumerate() { > - let snapshot = item.backup; > - > - // in-progress backups can't be synced > - if item.size.is_none() { > - task_log!( > - worker, > - "skipping snapshot {} - in-progress backup", > - snapshot > - ); > - continue; > - } > - > - remote_snapshots.insert(snapshot.time); > + let target_ns = params.get_target_ns()?; > > - if last_sync_time > snapshot.time { > - already_synced_skip_info.update(snapshot.time); > - continue; > - } else if already_synced_skip_info.count > 0 { > - task_log!(worker, "{}", already_synced_skip_info); > - already_synced_skip_info.reset(); > - } > - > - if pos < cutoff && last_sync_time != snapshot.time { > - transfer_last_skip_info.update(snapshot.time); > - continue; > - } else if transfer_last_skip_info.count > 0 { > - task_log!(worker, "{}", transfer_last_skip_info); > - transfer_last_skip_info.reset(); > - } > - > - // get updated auth_info (new tickets) > - let auth_info = client.login().await?; > + let mut source_snapshots = HashSet::new(); > + let last_sync_time = params > + .target > + .store > + .last_successful_backup(&target_ns, group)? > + .unwrap_or(i64::MIN); > + > + let list: Vec = raw_list > + .into_iter() > + .enumerate() > + .filter(|&(pos, ref dir)| { > + source_snapshots.insert(dir.time); > + if last_sync_time > dir.time { > + already_synced_skip_info.update(dir.time); > + return false; > + } else if already_synced_skip_info.count > 0 { > + task_log!(worker, "{}", already_synced_skip_info); > + already_synced_skip_info.reset(); > + return true; > + } > > - let options = > - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()) > - .rate_limit(params.limit.clone()); > + if pos < cutoff && last_sync_time != dir.time { > + transfer_last_skip_info.update(dir.time); > + return false; > + } else if transfer_last_skip_info.count > 0 { > + task_log!(worker, "{}", transfer_last_skip_info); > + transfer_last_skip_info.reset(); > + } > + true > + }) > + .map(|(_, dir)| dir) > + .collect(); > > - let new_client = HttpClient::new( > - params.source.host(), > - params.source.port(), > - params.source.auth_id(), > - options, > - )?; > + // start with 65536 chunks (up to 256 GiB) > + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64))); > > - let reader = BackupReader::start( > - &new_client, > - None, > - params.source.store(), > - &remote_ns, > - &snapshot, > - true, > - ) > - .await?; > + progress.group_snapshots = list.len() as u64; > > - let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; > + for (pos, from_snapshot) in list.into_iter().enumerate() { > + let to_snapshot = params > + .target > + .store > + .backup_dir(params.target.ns.clone(), from_snapshot.clone())?; > > - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; > + let reader = params > + .source > + .reader(source_namespace, &from_snapshot) > + .await?; > + let result = > + pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await; > > progress.done_snapshots = pos as u64 + 1; > task_log!(worker, "percentage done: {}", progress); > @@ -758,11 +956,14 @@ async fn pull_group( > } > > if params.remove_vanished { > - let group = params.store.backup_group(target_ns.clone(), group.clone()); > + let group = params > + .target > + .store > + .backup_group(params.get_target_ns()?, group.clone()); > let local_list = group.list_backups()?; > for info in local_list { > let snapshot = info.backup_dir; > - if remote_snapshots.contains(&snapshot.backup_time()) { > + if source_snapshots.contains(&snapshot.backup_time()) { > continue; > } > if snapshot.is_protected() { > @@ -774,73 +975,23 @@ async fn pull_group( > continue; > } > task_log!(worker, "delete vanished snapshot {}", snapshot.dir()); > - params > - .store > - .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?; > + params.target.store.remove_backup_dir( > + ¶ms.get_target_ns()?, > + snapshot.as_ref(), > + false, > + )?; > } > } > > Ok(()) > } > > -// will modify params if switching to backwards mode for lack of NS support on remote end > -async fn query_namespaces( > - worker: &WorkerTask, > - client: &HttpClient, > - params: &mut PullParameters, > -) -> Result, Error> { > - let path = format!( > - "api2/json/admin/datastore/{}/namespace", > - params.source.store() > - ); > - let mut data = json!({}); > - if let Some(max_depth) = params.max_depth { > - data["max-depth"] = json!(max_depth); > - } > - > - if !params.remote_ns.is_root() { > - data["parent"] = json!(params.remote_ns); > - } > - > - let mut result = match client.get(&path, Some(data)).await { > - Ok(res) => res, > - Err(err) => match err.downcast_ref::() { > - Some(HttpError { code, message }) => match *code { > - StatusCode::NOT_FOUND => { > - if params.remote_ns.is_root() && params.max_depth.is_none() { > - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); > - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); > - params.max_depth = Some(0); > - } else { > - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") > - } > - > - return Ok(vec![params.remote_ns.clone()]); > - } > - _ => { > - bail!("Querying namespaces failed - HTTP error {code} - {message}"); > - } > - }, > - None => { > - bail!("Querying namespaces failed - {err}"); > - } > - }, > - }; > - > - let mut list: Vec = serde_json::from_value(result["data"].take())?; > - > - // parents first > - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); > - > - Ok(list.iter().map(|item| item.ns.clone()).collect()) > -} > - > fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result { > let mut created = false; > - let store_ns_str = print_store_and_ns(params.store.name(), ns); > + let store_ns_str = print_store_and_ns(params.target.store.name(), ns); > > - if !ns.is_root() && !params.store.namespace_path(ns).exists() { > - check_ns_modification_privs(params.store.name(), ns, ¶ms.owner) > + if !ns.is_root() && !params.target.store.namespace_path(ns).exists() { > + check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner) > .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?; > > let name = match ns.components().last() { > @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< > } > }; > > - if let Err(err) = params.store.create_namespace(&ns.parent(), name) { > + if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) { > bail!("sync into {store_ns_str} failed - namespace creation failed: {err}"); > } > created = true; > } > > check_ns_privs( > - params.store.name(), > + params.target.store.name(), > ns, > ¶ms.owner, > PRIV_DATASTORE_BACKUP, > @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result< > } > > fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result { > - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner) > + check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner) > .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?; > > - params.store.remove_namespace_recursive(local_ns, true) > + params > + .target > + .store > + .remove_namespace_recursive(local_ns, true) > } > > fn check_and_remove_vanished_ns( > @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns( > // clamp like remote does so that we don't list more than we can ever have synced. > let max_depth = params > .max_depth > - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth()); > + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth()); > > let mut local_ns_list: Vec = params > + .target > .store > - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))? > + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))? > .filter(|ns| { > let user_privs = > - user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name())); > + user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name())); > user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 > }) > .collect(); > @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns( > local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); > > for local_ns in local_ns_list { > - if local_ns == params.ns { > + if local_ns == params.target.ns { > continue; > } > > @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns( > /// - access to sub-NS checked here > pub(crate) async fn pull_store( > worker: &WorkerTask, > - client: &HttpClient, > mut params: PullParameters, > ) -> Result<(), Error> { > // explicit create shared lock to prevent GC on newly created chunks > - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; > + let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?; > let mut errors = false; > > let old_max_depth = params.max_depth; > - let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) { > - vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces! > + let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) { > + vec![params.source.get_ns()] // backwards compat - don't query remote namespaces! > } else { > - query_namespaces(worker, client, &mut params).await? > + params > + .source > + .list_namespaces(&mut params.max_depth, worker) > + .await? > }; > + > + let ns_layers_to_be_pulled = namespaces > + .iter() > + .map(BackupNamespace::depth) > + .max() > + .map_or(0, |v| v - params.source.get_ns().depth()); > + let target_depth = params.target.ns.depth(); > + > + if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH { > + bail!( > + "Syncing would exceed max allowed namespace depth. ({}+{} > {})", > + ns_layers_to_be_pulled, > + target_depth, > + MAX_NAMESPACE_DEPTH > + ); > + } > + > errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode > + namespaces.sort_unstable_by_key(|a| a.name_len()); > > let (mut groups, mut snapshots) = (0, 0); > let mut synced_ns = HashSet::with_capacity(namespaces.len()); > > for namespace in namespaces { > - let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace); > + let source_store_ns_str = params.source.print_store_and_ns(); > > - let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?; > - let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns); > + let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?; > + let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns); > > task_log!(worker, "----"); > task_log!( > @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store( > } > } > > - match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await { > + match pull_ns(worker, &namespace, &mut params).await { > Ok((ns_progress, ns_errors)) => { > errors |= ns_errors; > > @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store( > task_log!( > worker, > "Encountered errors while syncing namespace {} - {}", > - namespace, > + &namespace, > err, > ); > } > @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store( > /// - owner check for vanished groups done here > pub(crate) async fn pull_ns( > worker: &WorkerTask, > - client: &HttpClient, > - params: &PullParameters, > - source_ns: BackupNamespace, > - target_ns: BackupNamespace, > + namespace: &BackupNamespace, > + params: &mut PullParameters, > ) -> Result<(StoreProgress, bool), Error> { > - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); > - > - let args = if !source_ns.is_root() { > - Some(json!({ > - "ns": source_ns, > - })) > - } else { > - None > - }; > - > - let mut result = client > - .get(&path, args) > - .await > - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; > - > - let mut list: Vec = serde_json::from_value(result["data"].take())?; > + let mut list: Vec = params.source.list_groups(namespace, ¶ms.owner).await?; > > let total_count = list.len(); > list.sort_unstable_by(|a, b| { > - let type_order = a.backup.ty.cmp(&b.backup.ty); > + let type_order = a.ty.cmp(&b.ty); > if type_order == std::cmp::Ordering::Equal { > - a.backup.id.cmp(&b.backup.id) > + a.id.cmp(&b.id) > } else { > type_order > } > }); > > - let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool { > + let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool { > filters.iter().any(|filter| group.matches(filter)) > }; > > - // Get groups with target NS set > - let list: Vec = list.into_iter().map(|item| item.backup).collect(); > - > let list = if let Some(ref group_filter) = ¶ms.group_filter { > let unfiltered_count = list.len(); > - let list: Vec = list > + let list: Vec = list > .into_iter() > .filter(|group| apply_filters(group, group_filter)) > .collect(); > @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns( > > let mut errors = false; > > - let mut new_groups = std::collections::HashSet::new(); > + let mut new_groups = HashSet::new(); > for group in list.iter() { > new_groups.insert(group.clone()); > } > > let mut progress = StoreProgress::new(list.len() as u64); > > + let target_ns = params.get_target_ns()?; > for (done, group) in list.into_iter().enumerate() { > progress.done_groups = done as u64; > progress.done_snapshots = 0; > @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns( > > let (owner, _lock_guard) = > match params > + .target > .store > .create_locked_backup_group(&target_ns, &group, ¶ms.owner) > { > @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns( > &group, > err > ); > - errors = true; // do not stop here, instead continue > + errors = true; > + // do not stop here, instead continue > + task_log!(worker, "create_locked_backup_group failed"); > continue; > } > }; > @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns( > owner > ); > errors = true; // do not stop here, instead continue > - } else if let Err(err) = pull_group( > - worker, > - client, > - params, > - &group, > - source_ns.clone(), > - &mut progress, > - ) > - .await > + } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await > { > task_log!(worker, "sync group {} failed - {}", &group, err,); > errors = true; // do not stop here, instead continue > @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns( > > if params.remove_vanished { > let result: Result<(), Error> = proxmox_lang::try_block!({ > - for local_group in params.store.iter_backup_groups(target_ns.clone())? { > + for local_group in params.target.store.iter_backup_groups(target_ns.clone())? { > let local_group = local_group?; > let local_group = local_group.group(); > if new_groups.contains(local_group) { > continue; > } > - let owner = params.store.get_owner(&target_ns, local_group)?; > + let owner = params.target.store.get_owner(&target_ns, local_group)?; > if check_backup_owner(&owner, ¶ms.owner).is_err() { > continue; > } > @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns( > } > } > task_log!(worker, "delete vanished group '{local_group}'",); > - match params.store.remove_backup_group(&target_ns, local_group) { > + match params > + .target > + .store > + .remove_backup_group(&target_ns, local_group) > + { > Ok(true) => {} > Ok(false) => { > task_log!( -- - Lukas