From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id 8567195190 for ; Tue, 28 Feb 2023 12:36:42 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 6D55965A0 for ; Tue, 28 Feb 2023 12:36:42 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Tue, 28 Feb 2023 12:36:38 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id CF7EC48BAC for ; Tue, 28 Feb 2023 12:36:37 +0100 (CET) Date: Tue, 28 Feb 2023 12:36:22 +0100 From: Fabian =?iso-8859-1?q?Gr=FCnbichler?= To: Proxmox Backup Server development discussion References: <20230223125540.1298442-1-h.laimer@proxmox.com> <20230223125540.1298442-6-h.laimer@proxmox.com> In-Reply-To: <20230223125540.1298442-6-h.laimer@proxmox.com> MIME-Version: 1.0 User-Agent: astroid/0.16.0 (https://github.com/astroidmail/astroid) Message-Id: <1677576387.2z8w7cmhc5.astroid@yuna.none> Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-SPAM-LEVEL: Spam detection results: 0 AWL 0.075 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment PROLO_LEO1 0.1 Meta Catches all Leo drug variations so far SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record URIBL_BLOCKED 0.001 ADMINISTRATOR NOTICE: The query to URIBL was blocked. See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block for more information. [pull.rs, datastore.rs] Subject: Re: [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 28 Feb 2023 11:36:42 -0000 On February 23, 2023 1:55 pm, Hannes Laimer wrote: > ... and rewrite pull logic. that's a bit terse ;) there's also general refactoring interleaved with the local pull support, i= t would be more easy to review if those two were split into separate patches. general remarks (also partly repeated below): - I don't like way of persisting the readers in the parameters, it shouldn'= t be needed and isn't a nice structuring of the code -- instead, at the point where previously a BackupReader is created (the lo= op body in pull_group), you can create the equivalent of PullSource but for th= e reader, which in turn -- contains a BackupReader + RemoteChunkReader for the remote case -- contains a lock guard + LocalChunkReader for the local case - there's now a mix of helpers that are moved to the source structs and hel= pers that are not, it should be one or the other across the board. adapting the helpers in-place probably makes it easier to tell what changed, if they all= have the &source as parameter it's then trivial to move them to an impl block on PullSource at the end and move longer parts to the local or remote impl blo= ck as well (possibly as follow-up to reduce the amount of rebasing you have to do= ).. - this patch would really benefit from being split into (at least) two patc= hes -- refactor the source handling with just the remote part (no semantic chan= ges!) -- add local pull support - some of the comments above individual functions are wrong, please check t= hem carefully and adapt where needed - please test recursive sync behaviour, the current version looks rather br= oken to me (but I haven't done any in-depth testing) - local reading doesn't lock the source, but it should (for remote this is handled for us by the HTTP2 reader session) >=20 > Signed-off-by: Hannes Laimer > --- > pbs-api-types/src/datastore.rs | 2 +- > pbs-datastore/src/read_chunk.rs | 2 +- > src/api2/pull.rs | 13 +- > src/server/pull.rs | 1023 +++++++++++++++++++------------ > 4 files changed, 648 insertions(+), 392 deletions(-) >=20 > diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore= .rs > index 72e8d1ee..9a692b08 100644 > --- a/pbs-api-types/src/datastore.rs > +++ b/pbs-api-types/src/datastore.rs > @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup { > /// Uniquely identify a Backup (relative to data store) > /// > /// We also call this a backup snaphost. > -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Seri= alize)] > +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize= , Serialize)] same as the Sync part below.. > #[serde(rename_all =3D "kebab-case")] > pub struct BackupDir { > /// Backup group. > diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chu= nk.rs > index c04a7431..29ee2d4c 100644 > --- a/pbs-datastore/src/read_chunk.rs > +++ b/pbs-datastore/src/read_chunk.rs > @@ -14,7 +14,7 @@ pub trait ReadChunk { > fn read_chunk(&self, digest: &[u8; 32]) -> Result, Error>; > } > =20 > -pub trait AsyncReadChunk: Send { > +pub trait AsyncReadChunk: Send + Sync { I guess you need this since you moved the BackupReader into PullParams? thi= s should not be needed.. > /// Returns the encoded chunk data > fn read_raw_chunk<'a>( > &'a self, > diff --git a/src/api2/pull.rs b/src/api2/pull.rs > index bb8f6fe1..2966190c 100644 > --- a/src/api2/pull.rs > +++ b/src/api2/pull.rs > @@ -121,8 +121,8 @@ pub fn do_sync_job( > let sync_job2 =3D sync_job.clone(); > =20 > let worker_future =3D async move { > - let pull_params =3D PullParameters::try_from(&sync_job)?= ; > - let client =3D pull_params.client().await?; > + let mut pull_params =3D PullParameters::try_from(&sync_j= ob)?; > + pull_params.init_source(sync_job.limit).await?; see below > =20 > task_log!(worker, "Starting datastore sync job '{}'", jo= b_id); > if let Some(event_str) =3D schedule { > @@ -137,7 +137,7 @@ pub fn do_sync_job( > ); > =20 > if sync_job.remote.is_some() { > - pull_store(&worker, &client, pull_params).await?; > + pull_store(&worker, pull_params).await?; > } else { > match (sync_job.ns, sync_job.remote_ns) { > (Some(target_ns), Some(source_ns)) > @@ -280,7 +280,7 @@ async fn pull( > delete, > )?; > =20 > - let pull_params =3D PullParameters::new( > + let mut pull_params =3D PullParameters::new( > &store, > ns, > remote.as_deref(), > @@ -290,9 +290,8 @@ async fn pull( > remove_vanished, > max_depth, > group_filter, > - limit, see below > )?; > - let client =3D pull_params.client().await?; > + pull_params.init_source(limit).await?; see below > =20 > // fixme: set to_stdout to false? > // FIXME: add namespace to worker id? > @@ -310,7 +309,7 @@ async fn pull( > remote_store, > ); > =20 > - let pull_future =3D pull_store(&worker, &client, pull_params= ); > + let pull_future =3D pull_store(&worker, pull_params); > (select! { > success =3D pull_future.fuse() =3D> success, > abort =3D worker.abort_future().map(|_| Err(format_err!(= "pull aborted"))) =3D> abort, > diff --git a/src/server/pull.rs b/src/server/pull.rs > index 65eedf2c..d3be39da 100644 > --- a/src/server/pull.rs > +++ b/src/server/pull.rs > @@ -1,28 +1,26 @@ > //! Sync datastore from remote server > =20 > use std::collections::{HashMap, HashSet}; > -use std::io::{Seek, SeekFrom}; > +use std::io::{Seek, SeekFrom, Write}; > +use std::path::PathBuf; > use std::sync::atomic::{AtomicUsize, Ordering}; > use std::sync::{Arc, Mutex}; > use std::time::SystemTime; > =20 > use anyhow::{bail, format_err, Error}; > use http::StatusCode; > -use pbs_config::CachedUserInfo; > -use serde_json::json; > - > +use proxmox_rest_server::WorkerTask; > use proxmox_router::HttpError; > use proxmox_sys::task_log; > +use serde_json::json; > =20 > use pbs_api_types::{ > - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListI= tem, NamespaceListItem, > + print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, G= roupFilter, GroupListItem, > Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_= DEPTH, > PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, > }; > - > -use pbs_client::{ > - BackupReader, BackupRepository, HttpClient, HttpClientOptions, Remot= eChunkReader, > -}; > +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunk= Reader}; > +use pbs_config::CachedUserInfo; > use pbs_datastore::data_blob::DataBlob; > use pbs_datastore::dynamic_index::DynamicIndexReader; > use pbs_datastore::fixed_index::FixedIndexReader; > @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile; > use pbs_datastore::manifest::{ > archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB= _NAME, MANIFEST_BLOB_NAME, > }; > -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; > +use pbs_datastore::read_chunk::AsyncReadChunk; > +use pbs_datastore::{ > + check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkRe= ader, StoreProgress, > +}; > use pbs_tools::sha::sha256; > -use proxmox_rest_server::WorkerTask; > =20 > -use crate::backup::{check_ns_modification_privs, check_ns_privs}; > +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAcc= essibleBackupGroups}; > use crate::tools::parallel_handler::ParallelHandler; > =20 > /// Parameters for a pull operation. > pub(crate) struct PullParameters { > - /// Remote that is pulled from > - remote: Remote, > - /// Full specification of remote datastore > - source: BackupRepository, > - /// Local store that is pulled into > - store: Arc, > - /// Remote namespace > - remote_ns: BackupNamespace, > - /// Local namespace (anchor) > - ns: BackupNamespace, > + /// Where data is pulled from > + source: PullSource, > + /// Where data should be pulled into > + target: PullTarget, > /// Owner of synced groups (needs to match local owner of pre-existi= ng groups) > owner: Authid, > /// Whether to remove groups which exist locally, but not on the rem= ote end > @@ -57,70 +51,459 @@ pub(crate) struct PullParameters { > max_depth: Option, > /// Filters for reducing the pull scope > group_filter: Option>, > - /// Rate limits for all transfers from `remote` > - limit: RateLimitConfig, > +} > + > +pub(crate) enum PullSource { > + Remote(RemoteSource), > + Local(LocalSource), > +} > + > +pub(crate) struct PullTarget { > + store: Arc, > + ns: BackupNamespace, > +} > + > +pub(crate) struct LocalSource { > + store: Arc, > + ns: BackupNamespace, > +} > + > +pub(crate) struct RemoteSource { > + remote: Remote, > + repo: BackupRepository, > + ns: BackupNamespace, > + client: Option, > + backup_reader: HashMap>, this is not needed all - you never pull multiple snapshots in parallel, so there is no need to store readers for each pulled snapshot. there also shouldn't be state stored in PullParams at all, it should always= be possible to pass that down from caller to next layer.. > +} > + > +impl PullSource { > + pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Resul= t<(), Error> { > + match self { > + PullSource::Remote(source) =3D> { > + source.client.replace( > + crate::api2::config::remote::remote_client(&source.r= emote, Some(limit)).await?, > + ); > + } > + PullSource::Local(_) =3D> {} > + }; > + Ok(()) > + } drop this, and keep the old client helper but move it to PullSource::Remote > + > + async fn list_namespaces( > + &self, > + max_depth: &mut Option, > + worker: &WorkerTask, > + ) -> Result, Error> { > + match &self { > + PullSource::Remote(source) =3D> list_remote_namespaces(sourc= e, max_depth, worker).await, this checks permissions (on the remote system) > + PullSource::Local(source) =3D> ListNamespacesRecursive::new_= max_depth( > + source.store.clone(), > + source.ns.clone(), > + max_depth.unwrap_or(MAX_NAMESPACE_DEPTH), > + )? > + .collect(), this doesn't, but should > + } > + } > + > + async fn list_groups( > + &self, > + namespace: &BackupNamespace, > + owner: &Authid, > + ) -> Result, Error> { > + match &self { > + PullSource::Remote(source) =3D> { > + let path =3D format!("api2/json/admin/datastore/{}/group= s", source.repo.store()); > + > + let args =3D if !namespace.is_root() { > + Some(json!({ "ns": namespace.clone() })) > + } else { > + None > + }; > + > + let client =3D source.get_client()?; > + client.login().await?; this could just initialize a new client using the helper, instead of retrie= ving a stored one that might have an expired ticket.. > + let mut result =3D client.get(&path, args).await.map_err= (|err| { > + format_err!("Failed to retrieve backup groups from r= emote - {}", err) > + })?; > + > + Ok( > + serde_json::from_value::>(result[= "data"].take()) > + .map_err(|e| Error::from(e))? could just be .map_err(Error::from), but > + .into_iter() > + .map(|item| item.backup) > + .collect::>(), > + ) the whole thing is probably easier to read when done like this: let list: Vec =3D serde_json::from_value(result["data"].ta= ke())?; let list: Vec =3D list.into_iter().map(|group| group.backup).collect(); Ok(list) > + } > + PullSource::Local(source) =3D> Ok(ListAccessibleBackupGroups= ::new_with_privs( > + &source.store, > + namespace.clone(), > + MAX_NAMESPACE_DEPTH, > + None, > + None, > + Some(owner), > + )? > + .filter_map(Result::ok) > + .map(|backup_group| backup_group.group().clone()) > + .collect::>()), this has two issues: - it recurses over namespaces, while it should only list groups in the curr= ent namespace without recursion - it doesn't set the expected privs, so this will potentially list too few = or too many groups as well, even within a namespace where the user is supposed= to have access -- too little if the user has PRIV_DATASTORE_READ and should be able to rea= d groups owned by other users/tokens -- too many if the user only has PRIV_DATASTORE_AUDIT, since owned groups a= re then readable despite missing PRIV_DATASTORE_BACKUP > + } > + } > + > + async fn list_backup_dirs( > + &self, > + namespace: &BackupNamespace, > + group: &pbs_api_types::BackupGroup, > + worker: &WorkerTask, > + ) -> Result, Error> { > + match &self { > + PullSource::Remote(source) =3D> { > + let path =3D format!( > + "api2/json/admin/datastore/{}/snapshots", > + source.repo.store() > + ); > + > + let mut args =3D json!({ > + "backup-type": group.ty, > + "backup-id": group.id, > + }); > + > + if !source.ns.is_root() { > + args["ns"] =3D serde_json::to_value(&source.ns)?; > + } > + > + let client =3D source.get_client()?; > + client.login().await?; this could probably also get a fresh client.. > + > + let mut result =3D client.get(&path, Some(args)).await?; > + let snapshot_list: Vec =3D > + serde_json::from_value(result["data"].take())?; > + Ok(snapshot_list > + .into_iter() > + .filter_map(|item: SnapshotListItem| { > + let snapshot =3D item.backup; > + // in-progress backups can't be synced > + if item.size.is_none() { > + task_log!( > + worker, > + "skipping snapshot {} - in-progress back= up", > + snapshot > + ); > + return None; > + } > + > + Some(snapshot) > + }) > + .collect::>()) > + } > + PullSource::Local(source) =3D> Ok(source > + .store > + .backup_group(namespace.clone(), group.clone()) > + .iter_snapshots()? > + .filter_map(Result::ok) this hides errors when iterating.. > + .map(|snapshot| snapshot.dir().to_owned()) but doesn't skip "in-progress" snapshots like the remote version does.. > + .collect::>()), > + } > + } > + > + /// Load file from source namespace and BackupDir into file > + async fn load_file_into( > + &mut self, > + namespace: &BackupNamespace, > + snapshot: &pbs_api_types::BackupDir, > + filename: &str, > + into: &PathBuf, > + worker: &WorkerTask, > + ) -> Result, Error> { > + let mut tmp_file =3D std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .truncate(true) > + .read(true) > + .open(into)?; > + match self { > + PullSource::Remote(ref mut source) =3D> { > + let client =3D source.get_client()?; > + client.login().await?; > + > + let reader =3D if let Some(reader) =3D source.backup_rea= der.get(snapshot) { > + reader.clone() > + } else { > + let backup_reader =3D BackupReader::start( > + client, > + None, > + source.repo.store(), > + namespace, > + snapshot, > + true, > + ) > + .await?; > + source > + .backup_reader > + .insert(snapshot.clone(), backup_reader.clone())= ; > + backup_reader > + }; > + > + let download_result =3D reader.download(filename, &mut t= mp_file).await; > + > + if let Err(err) =3D download_result { > + match err.downcast_ref::() { > + Some(HttpError { code, message }) =3D> match *co= de { > + StatusCode::NOT_FOUND =3D> { > + task_log!( > + worker, > + "skipping snapshot {} - vanished sin= ce start of sync", this was previously only logged when the manifest went missing.. when downloading the indices later on this shouldn't happen since we have an act= ive reader session open which holds a log, so nobody should be able to pull out= the snapshot under us and it should be a hard error.. > + snapshot, > + ); > + return Ok(None); > + } > + _ =3D> { > + bail!("HTTP error {code} - {message}"); > + } > + }, > + None =3D> { > + return Err(err); > + } > + }; > + }; > + } > + PullSource::Local(source) =3D> { > + let dir =3D source > + .store > + .backup_dir(namespace.clone(), snapshot.clone())?; > + let mut from_path =3D dir.full_path(); > + from_path.push(filename); > + tmp_file.write_all(std::fs::read(from_path)?.as_slice())= ?; > + } > + } > + > + tmp_file.seek(SeekFrom::Start(0))?; > + Ok(DataBlob::load_from_reader(&mut tmp_file).ok()) hides errors!! note that this seeks to the beginning and loads the blob (see [0] markers) > + } > + > + // Note: The client.log.blob is uploaded after the backup, so it is > + // not mentioned in the manifest. > + async fn try_download_client_log( > + &self, > + from_snapshot: &pbs_api_types::BackupDir, there is only a single snapshot involved > + to_path: &std::path::Path, and a single path, so not sure whether we really need the from/to prefix? > + worker: &WorkerTask, worker should come first.. > + ) -> Result<(), Error> { > + match &self { > + PullSource::Remote(source) =3D> { > + let reader =3D source > + .backup_reader > + .get(from_snapshot) > + .ok_or(format_err!("Can't download chunks without a = BackupReader"))?; > + let mut tmp_path =3D to_path.to_owned(); > + tmp_path.set_extension("tmp"); > + > + let tmpfile =3D std::fs::OpenOptions::new() > + .write(true) > + .create(true) > + .read(true) > + .open(&tmp_path)?; > + > + // Note: be silent if there is no log - only log success= ful download > + if let Ok(()) =3D reader.download(CLIENT_LOG_BLOB_NAME, = tmpfile).await { > + if let Err(err) =3D std::fs::rename(&tmp_path, to_pa= th) { > + bail!("Atomic rename file {:?} failed - {}", to_= path, err); > + } > + task_log!(worker, "got backup log file {:?}", CLIENT= _LOG_BLOB_NAME); > + } > + > + Ok(()) this should probably return the tmpfile, so that we only open it once.. > + } > + PullSource::Local(_) =3D> Ok(()), local sync should also copy the log? also, similar to other parts - this is mostly the old code refactored, but also slightly changed. it would have be= en nice to have the "refactor" part first (with only a single match arm), and = the add local part second. > + } > + } > + > + fn get_chunk_reader( > + &self, > + snapshot: &pbs_api_types::BackupDir, > + crypt_mode: CryptMode, > + ) -> Result, Error> { > + Ok(match &self { > + PullSource::Remote(source) =3D> { > + if let Some(reader) =3D source.backup_reader.get(snapsho= t) { > + Arc::new(RemoteChunkReader::new( > + reader.clone(), > + None, > + crypt_mode, > + HashMap::new(), > + )) > + } else { > + bail!("No initialized BackupReader!") > + } > + } > + PullSource::Local(source) =3D> Arc::new(LocalChunkReader::ne= w( > + source.store.clone(), > + None, > + crypt_mode, > + )), > + }) > + } shouldn't be needed - the reader should be passed down on its own.. a helpe= r for the first time the source needs to be converted to a reader might still be = a good idea, but without persisting inside the source.. > + > + fn get_ns(&self) -> BackupNamespace { > + match &self { > + PullSource::Remote(source) =3D> source.ns.clone(), > + PullSource::Local(source) =3D> source.ns.clone(), > + } > + } > + > + fn print_store_and_ns(&self) -> String { > + match &self { > + PullSource::Remote(source) =3D> print_store_and_ns(source.re= po.store(), &source.ns), > + PullSource::Local(source) =3D> print_store_and_ns(source.sto= re.name(), &source.ns), > + } > + } > +} > + > +impl RemoteSource { > + fn get_client(&self) -> Result<&HttpClient, Error> { > + if let Some(client) =3D &self.client { > + Ok(client) > + } else { > + bail!("RemoteSource not initialized") > + } > + } should instead return a fresh client.. > } > =20 > impl PullParameters { > /// Creates a new instance of `PullParameters`. > - /// > - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig],= and combined into a > - /// [BackupRepository] with `remote_store`. > - #[allow(clippy::too_many_arguments)] > pub(crate) fn new( > store: &str, > ns: BackupNamespace, > - remote: &str, > + remote: Option<&str>, > remote_store: &str, > remote_ns: BackupNamespace, > owner: Authid, > remove_vanished: Option, > max_depth: Option, > group_filter: Option>, > - limit: RateLimitConfig, should either move to RemoteSource, or be implemented for both variants and= stay here.. > ) -> Result { > - let store =3D DataStore::lookup_datastore(store, Some(Operation:= :Write))?; > - > if let Some(max_depth) =3D max_depth { > ns.check_max_depth(max_depth)?; > remote_ns.check_max_depth(max_depth)?; > - } > + }; > + let remove_vanished =3D remove_vanished.unwrap_or(false); > =20 > - let (remote_config, _digest) =3D pbs_config::remote::config()?; > - let remote: Remote =3D remote_config.lookup("remote", remote)?; > + let source: PullSource =3D if let Some(remote) =3D remote { > + let (remote_config, _digest) =3D pbs_config::remote::config(= )?; > + let remote: Remote =3D remote_config.lookup("remote", remote= )?; > =20 > - let remove_vanished =3D remove_vanished.unwrap_or(false); > + let repo =3D BackupRepository::new( > + Some(remote.config.auth_id.clone()), > + Some(remote.config.host.clone()), > + remote.config.port, > + remote_store.to_string(), > + ); > =20 > - let source =3D BackupRepository::new( > - Some(remote.config.auth_id.clone()), > - Some(remote.config.host.clone()), > - remote.config.port, > - remote_store.to_string(), > - ); > + PullSource::Remote(RemoteSource { > + remote, > + repo, > + ns: remote_ns.clone(), > + client: None, > + backup_reader: HashMap::new(), > + }) > + } else { > + PullSource::Local(LocalSource { > + store: DataStore::lookup_datastore(remote_store, Some(Op= eration::Read))?, > + ns: remote_ns, > + }) > + }; > + let target =3D PullTarget { > + store: DataStore::lookup_datastore(store, Some(Operation::Wr= ite))?, > + ns, > + }; > =20 > Ok(Self { > - remote, > - remote_ns, > - ns, > source, > - store, > + target, > owner, > remove_vanished, > max_depth, > group_filter, > - limit, > }) > } > =20 > - /// Creates a new [HttpClient] for accessing the [Remote] that is pu= lled from. > - pub async fn client(&self) -> Result { > - crate::api2::config::remote::remote_client(&self.remote, Some(se= lf.limit.clone())).await > + pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -= > Result<(), Error> { > + self.source.init(limit).await > + } see other related comments > + > + pub(crate) fn skip_chunk_sync(&self) -> bool { > + match &self.source { > + PullSource::Local(source) =3D> source.store.name() =3D=3D se= lf.target.store.name(), > + PullSource::Remote(_) =3D> false, > + } > + } > + > + pub(crate) fn get_target_ns(&self) -> Result= { > + let source_ns =3D self.source.get_ns(); > + source_ns.map_prefix(&source_ns, &self.target.ns) this doesn't do the right thing, see the two call sites.. > } > } > =20 > +async fn list_remote_namespaces( > + source: &RemoteSource, > + max_depth: &mut Option, > + worker: &WorkerTask, worker usually comes first if passed as argument.. but it's only passed in = for the two log statements, which are actually possible to handle at the call s= ite as well (there's a check there for whether max_depth was modified). > +) -> Result, Error> { > + if source.ns.is_root() && max_depth.map_or(false, |depth| depth =3D= =3D 0) { > + vec![source.ns.clone()]; missing 'return' I think? also changes the check for some reason, although = they are semantically the same I am not sure it's worth the churn in an already = quite crowded patch (series). > + } > + > + let path =3D format!( > + "api2/json/admin/datastore/{}/namespace", > + source.repo.store() > + ); > + let mut data =3D json!({}); > + if let Some(max_depth) =3D max_depth { > + data["max-depth"] =3D json!(max_depth); > + } > + > + if !source.ns.is_root() { > + data["parent"] =3D json!(source.ns); > + } > + > + let client =3D source.get_client()?; > + client.login().await?; > + > + let mut result =3D match client.get(&path, Some(data)).await { > + Ok(res) =3D> res, > + Err(err) =3D> match err.downcast_ref::() { > + Some(HttpError { code, message }) =3D> match code { > + &StatusCode::NOT_FOUND =3D> { > + if source.ns.is_root() && max_depth.is_none() { > + task_log!(worker, "Could not query remote for na= mespaces (404) -> temporarily switching to backwards-compat mode"); > + task_log!(worker, "Either make backwards-compat = mode explicit (max-depth =3D=3D 0) or upgrade remote system."); see above, this could be logged at the call site to avoid passing in worker= at all. or it could remain here, but then please put the worker first in the argument list ;) > + max_depth.replace(0); > + } else { > + bail!("Remote namespace set/recursive sync reque= sted, but remote does not support namespaces.") > + } > + > + return Ok(vec![source.ns.clone()]); > + } > + _ =3D> { > + bail!("Querying namespaces failed - HTTP error {code= } - {message}"); > + } > + }, > + None =3D> { > + bail!("Querying namespaces failed - {err}"); > + } > + }, > + }; > + > + let list: Vec =3D > + serde_json::from_value::>(= result["data"].take())? > + .iter() > + .map(|list_item| list_item.ns.clone()) > + .collect(); > + > + Ok(list) > +} this is one of the examples that would have really benefited from splitting= this into a "no-change refactor PullParams with RemoteSource" patch and an "add = local pull support" patch.. this is basically the old query_namespaces with param= eter adjustements, but then *also* other changes. now because the location also completely changed, I have to manually diff the two to see what actually ch= anged (or if semantic changes are hidden in the noise). > + > async fn pull_index_chunks( > worker: &WorkerTask, > - chunk_reader: RemoteChunkReader, > + chunk_reader: Arc, > target: Arc, > index: I, > downloaded_chunks: Arc>>, > @@ -211,26 +594,6 @@ async fn pull_index_chunks( > Ok(()) > } > =20 > -async fn download_manifest( > - reader: &BackupReader, > - filename: &std::path::Path, > -) -> Result { > - let mut tmp_manifest_file =3D std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .truncate(true) > - .read(true) > - .open(filename)?; > - > - reader > - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file) > - .await?; > - > - tmp_manifest_file.seek(SeekFrom::Start(0))?; > - > - Ok(tmp_manifest_file) > -} > - > fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result= <(), Error> { > if size !=3D info.size { > bail!( > @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32],= size: u64) -> Result<(), Err > /// Pulls a single file referenced by a manifest. > /// > /// Pulling an archive consists of the following steps: > -/// - Create tmp file for archive > -/// - Download archive file into tmp file > +/// - Load archive file into tmp file > /// - Verify tmp file checksum > /// - if archive is an index, pull referenced chunks > /// - Rename tmp file into real path > async fn pull_single_archive( > worker: &WorkerTask, > - reader: &BackupReader, > - chunk_reader: &mut RemoteChunkReader, > - snapshot: &pbs_datastore::BackupDir, > + params: &mut PullParameters, > + from_namespace: &BackupNamespace, > + from_snapshot: &pbs_api_types::BackupDir, > + to_snapshot: &pbs_datastore::BackupDir, > archive_info: &FileInfo, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > let archive_name =3D &archive_info.filename; > - let mut path =3D snapshot.full_path(); > + let mut path =3D to_snapshot.full_path(); > path.push(archive_name); > =20 > let mut tmp_path =3D path.clone(); > @@ -273,13 +636,18 @@ async fn pull_single_archive( > =20 > task_log!(worker, "sync archive {}", archive_name); > =20 > - let mut tmpfile =3D std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .read(true) > - .open(&tmp_path)?; > + params > + .source > + .load_file_into( > + from_namespace, > + from_snapshot, > + archive_name, > + &tmp_path, > + worker, > + ) > + .await?; [0] this returns the blob, but it's not used.. > =20 > - reader.download(archive_name, &mut tmpfile).await?; > + let mut tmpfile =3D std::fs::OpenOptions::new().read(true).open(&tmp= _path)?; couldn't load_file_into just return the open file? > =20 > match archive_type(archive_name)? { > ArchiveType::DynamicIndex =3D> { > @@ -289,14 +657,20 @@ async fn pull_single_archive( > let (csum, size) =3D index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > =20 > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if params.skip_chunk_sync() { > + task_log!(worker, "skipping chunk sync for same datatsor= e"); typo, also I asked whether it wouldn't make sense to check that the chunks = are there when reviewing v1? it's basically "only" a series of 'stat' calls. > + } else { > + pull_index_chunks( > + worker, > + params > + .source > + .get_chunk_reader(from_snapshot, archive_info.cr= ypt_mode)?, > + params.target.store.clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::FixedIndex =3D> { > let index =3D FixedIndexReader::new(tmpfile).map_err(|err| { > @@ -305,14 +679,20 @@ async fn pull_single_archive( > let (csum, size) =3D index.compute_csum(); > verify_archive(archive_info, &csum, size)?; > =20 > - pull_index_chunks( > - worker, > - chunk_reader.clone(), > - snapshot.datastore().clone(), > - index, > - downloaded_chunks, > - ) > - .await?; > + if params.skip_chunk_sync() { > + task_log!(worker, "skipping chunk sync for same datatsor= e"); same as above for dynamic indices.. > + } else { > + pull_index_chunks( > + worker, > + params > + .source > + .get_chunk_reader(from_snapshot, archive_info.cr= ypt_mode)?, > + params.target.store.clone(), > + index, > + downloaded_chunks, > + ) > + .await?; > + } > } > ArchiveType::Blob =3D> { > tmpfile.seek(SeekFrom::Start(0))?; [0] so here we *again* seek to the start, which load_file_into already does= for us *if* we skip the blob loading there.. > @@ -326,33 +706,6 @@ async fn pull_single_archive( > Ok(()) > } > =20 > -// Note: The client.log.blob is uploaded after the backup, so it is > -// not mentioned in the manifest. > -async fn try_client_log_download( > - worker: &WorkerTask, > - reader: Arc, > - path: &std::path::Path, > -) -> Result<(), Error> { > - let mut tmp_path =3D path.to_owned(); > - tmp_path.set_extension("tmp"); > - > - let tmpfile =3D std::fs::OpenOptions::new() > - .write(true) > - .create(true) > - .read(true) > - .open(&tmp_path)?; > - > - // Note: be silent if there is no log - only log successful download > - if let Ok(()) =3D reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).awa= it { > - if let Err(err) =3D std::fs::rename(&tmp_path, path) { > - bail!("Atomic rename file {:?} failed - {}", path, err); > - } > - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NA= ME); > - } > - > - Ok(()) > -} > - > /// Actual implementation of pulling a snapshot. > /// > /// Pulling a snapshot consists of the following steps: > @@ -364,44 +717,37 @@ async fn try_client_log_download( > /// - Download log if not already existing > async fn pull_snapshot( > worker: &WorkerTask, > - reader: Arc, > - snapshot: &pbs_datastore::BackupDir, > + params: &mut PullParameters, > + namespace: &BackupNamespace, > + from_snapshot: &pbs_api_types::BackupDir, > + to_snapshot: &pbs_datastore::BackupDir, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > - let mut manifest_name =3D snapshot.full_path(); > + let mut manifest_name =3D to_snapshot.full_path(); > manifest_name.push(MANIFEST_BLOB_NAME); > =20 > - let mut client_log_name =3D snapshot.full_path(); > + let mut client_log_name =3D to_snapshot.full_path(); > client_log_name.push(CLIENT_LOG_BLOB_NAME); > =20 > let mut tmp_manifest_name =3D manifest_name.clone(); > tmp_manifest_name.set_extension("tmp"); > =20 > - let download_res =3D download_manifest(&reader, &tmp_manifest_name).= await; > - let mut tmp_manifest_file =3D match download_res { > - Ok(manifest_file) =3D> manifest_file, > - Err(err) =3D> { > - match err.downcast_ref::() { > - Some(HttpError { code, message }) =3D> match *code { > - StatusCode::NOT_FOUND =3D> { > - task_log!( > - worker, > - "skipping snapshot {} - vanished since start= of sync", > - snapshot.dir(), > - ); > - return Ok(()); > - } > - _ =3D> { > - bail!("HTTP error {code} - {message}"); > - } > - }, > - None =3D> { > - return Err(err); > - } > - }; > - } > - }; > - let tmp_manifest_blob =3D DataBlob::load_from_reader(&mut tmp_manife= st_file)?; > + let tmp_manifest_blob; > + if let Some(data) =3D params > + .source > + .load_file_into( > + namespace, > + from_snapshot, > + MANIFEST_BLOB_NAME, > + &tmp_manifest_name, > + worker, > + ) > + .await? > + { > + tmp_manifest_blob =3D data; [0] so this is the only part that actually uses the parsed blob, it might m= ake sense to only do that parsing here.. > + } else { > + return Ok(()); even further hides the wrong error handling from load_file_into.. > + } > =20 > if manifest_name.exists() { > let manifest_blob =3D proxmox_lang::try_block!({ > @@ -418,8 +764,11 @@ async fn pull_snapshot( > =20 > if manifest_blob.raw_data() =3D=3D tmp_manifest_blob.raw_data() = { > if !client_log_name.exists() { > - try_client_log_download(worker, reader, &client_log_name= ).await?; > - } > + params > + .source > + .try_download_client_log(from_snapshot, &client_log_= name, worker) > + .await?; > + }; > task_log!(worker, "no data changes"); > let _ =3D std::fs::remove_file(&tmp_manifest_name); > return Ok(()); // nothing changed > @@ -429,7 +778,7 @@ async fn pull_snapshot( > let manifest =3D BackupManifest::try_from(tmp_manifest_blob)?; > =20 > for item in manifest.files() { > - let mut path =3D snapshot.full_path(); > + let mut path =3D to_snapshot.full_path(); > path.push(&item.filename); > =20 > if path.exists() { > @@ -467,18 +816,12 @@ async fn pull_snapshot( > } > } > =20 > - let mut chunk_reader =3D RemoteChunkReader::new( > - reader.clone(), > - None, > - item.chunk_crypt_mode(), > - HashMap::new(), > - ); > - > pull_single_archive( > worker, > - &reader, > - &mut chunk_reader, > - snapshot, > + params, > + namespace, > + from_snapshot, > + to_snapshot, > item, > downloaded_chunks.clone(), > ) > @@ -490,10 +833,12 @@ async fn pull_snapshot( > } > =20 > if !client_log_name.exists() { > - try_client_log_download(worker, reader, &client_log_name).await?= ; > - } > - > - snapshot > + params > + .source > + .try_download_client_log(from_snapshot, &client_log_name, wo= rker) > + .await?; > + }; nit: stray ';' > + to_snapshot > .cleanup_unreferenced_files(&manifest) > .map_err(|err| format_err!("failed to cleanup unreferenced files= - {err}"))?; > =20 > @@ -501,37 +846,53 @@ async fn pull_snapshot( > } > =20 > /// Pulls a `snapshot`, removing newly created ones on error, but keepin= g existing ones in any case. > -/// > -/// The `reader` is configured to read from the remote / source namespac= e, while the `snapshot` is > -/// pointing to the local datastore and target namespace. please at least describe what `namespace` is referring to here.. > async fn pull_snapshot_from( > worker: &WorkerTask, > - reader: Arc, > - snapshot: &pbs_datastore::BackupDir, > + params: &mut PullParameters, > + namespace: &BackupNamespace, > + from_snapshot: &pbs_api_types::BackupDir, > + to_snapshot: &pbs_datastore::BackupDir, > downloaded_chunks: Arc>>, > ) -> Result<(), Error> { > - let (_path, is_new, _snap_lock) =3D snapshot > + let (_path, is_new, _snap_lock) =3D to_snapshot > .datastore() > - .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref(= ))?; > + .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.a= s_ref())?; > =20 > if is_new { > - task_log!(worker, "sync snapshot {}", snapshot.dir()); > + task_log!(worker, "sync snapshot {}", to_snapshot.dir()); > =20 > - if let Err(err) =3D pull_snapshot(worker, reader, snapshot, down= loaded_chunks).await { > - if let Err(cleanup_err) =3D snapshot.datastore().remove_back= up_dir( > - snapshot.backup_ns(), > - snapshot.as_ref(), > + if let Err(err) =3D pull_snapshot( > + worker, > + params, > + namespace, > + from_snapshot, > + to_snapshot, > + downloaded_chunks, > + ) > + .await > + { > + if let Err(cleanup_err) =3D to_snapshot.datastore().remove_b= ackup_dir( > + to_snapshot.backup_ns(), > + to_snapshot.as_ref(), > true, > ) { > task_log!(worker, "cleanup error - {}", cleanup_err); > } > return Err(err); > } > - task_log!(worker, "sync snapshot {} done", snapshot.dir()); > + task_log!(worker, "sync snapshot {} done", to_snapshot.dir()); > } else { > - task_log!(worker, "re-sync snapshot {}", snapshot.dir()); > - pull_snapshot(worker, reader, snapshot, downloaded_chunks).await= ?; > - task_log!(worker, "re-sync snapshot {} done", snapshot.dir()); > + task_log!(worker, "re-sync snapshot {}", to_snapshot.dir()); > + pull_snapshot( > + worker, > + params, > + namespace, > + from_snapshot, > + to_snapshot, > + downloaded_chunks, > + ) > + .await?; > + task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir())= ; > } > =20 > Ok(()) > @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo { > /// - Sort by snapshot time > /// - Get last snapshot timestamp on local datastore > /// - Iterate over list of snapshots > -/// -- Recreate client/BackupReader should still be done. > /// -- pull snapshot, unless it's not finished yet or older than last lo= cal snapshot > /// - (remove_vanished) list all local snapshots, remove those that don'= t exist on remote > /// > @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo { > /// - local group owner is already checked by pull_store > async fn pull_group( > worker: &WorkerTask, > - client: &HttpClient, > - params: &PullParameters, > + params: &mut PullParameters, should not be done ;) > + source_namespace: &BackupNamespace, in general: inconsistent naming with regards to source/remote/local and fro= m/to. it would be good to be consistent at least internally, even if the config/a= pi parameters are a bit "weird" for backwards compat reasons for now.. > group: &pbs_api_types::BackupGroup, > - remote_ns: BackupNamespace, > progress: &mut StoreProgress, > ) -> Result<(), Error> { > - let path =3D format!( > - "api2/json/admin/datastore/{}/snapshots", > - params.source.store() > - ); > - > - let mut args =3D json!({ > - "backup-type": group.ty, > - "backup-id": group.id, > - }); > - > - if !remote_ns.is_root() { > - args["ns"] =3D serde_json::to_value(&remote_ns)?; > - } > - > - let target_ns =3D remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns= )?; this involves three namespaces: - the remote anchor - the local anchor - the current namespace being pulled > - > - let mut result =3D client.get(&path, Some(args)).await?; > - let mut list: Vec =3D serde_json::from_value(resul= t["data"].take())?; > - > - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); > - > - client.login().await?; // make sure auth is complete > - > - let fingerprint =3D client.fingerprint(); > - > - let last_sync =3D params.store.last_successful_backup(&target_ns, gr= oup)?; > - > - let mut remote_snapshots =3D std::collections::HashSet::new(); > - > - // start with 65536 chunks (up to 256 GiB) > - let downloaded_chunks =3D Arc::new(Mutex::new(HashSet::with_capacity= (1024 * 64))); > - > - progress.group_snapshots =3D list.len() as u64; > + let target_ns =3D params.get_target_ns()?; this is wrong, since it only involves two namespaces (the two anchors). > =20 > + let mut source_snapshots =3D HashSet::new(); > + let last_sync =3D params > + .target > + .store > + .last_successful_backup(&target_ns, group)?; > let mut skip_info =3D SkipInfo { > oldest: i64::MAX, > newest: i64::MIN, > count: 0, > }; > =20 > - for (pos, item) in list.into_iter().enumerate() { > - let snapshot =3D item.backup; > - > - // in-progress backups can't be synced > - if item.size.is_none() { > - task_log!( > - worker, > - "skipping snapshot {} - in-progress backup", > - snapshot > - ); > - continue; > - } > + let mut list: Vec =3D params > + .source > + .list_backup_dirs(source_namespace, group, worker) > + .await? > + .into_iter() > + .filter(|dir| { > + source_snapshots.insert(dir.time); > + if let Some(last_sync_time) =3D last_sync { > + if last_sync_time > dir.time { > + skip_info.update(dir.time); > + return false; > + } > + } > + true > + }) > + .collect(); > =20 > - remote_snapshots.insert(snapshot.time); > + list.sort_unstable_by(|a, b| a.time.cmp(&b.time)); > =20 > - if let Some(last_sync_time) =3D last_sync { > - if last_sync_time > snapshot.time { > - skip_info.update(snapshot.time); > - continue; > - } > - } > + // start with 65536 chunks (up to 256 GiB) > + let downloaded_chunks =3D Arc::new(Mutex::new(HashSet::with_capacity= (1024 * 64))); > =20 > - // get updated auth_info (new tickets) > - let auth_info =3D client.login().await?; > - > - let options =3D > - HttpClientOptions::new_non_interactive(auth_info.ticket.clon= e(), fingerprint.clone()) > - .rate_limit(params.limit.clone()); > - > - let new_client =3D HttpClient::new( > - params.source.host(), > - params.source.port(), > - params.source.auth_id(), > - options, > - )?; > - > - let reader =3D BackupReader::start( > - new_client, > - None, > - params.source.store(), > - &remote_ns, > - &snapshot, > - true, > - ) > - .await?; > + progress.group_snapshots =3D list.len() as u64; > =20 > - let snapshot =3D params.store.backup_dir(target_ns.clone(), snap= shot)?; > + for (pos, from_snapshot) in list.into_iter().enumerate() { > + let to_snapshot =3D params > + .target > + .store > + .backup_dir(params.target.ns.clone(), from_snapshot.clone())= ?; > =20 > - let result =3D pull_snapshot_from(worker, reader, &snapshot, dow= nloaded_chunks.clone()).await; > + let result =3D pull_snapshot_from( > + worker, > + params, > + source_namespace, > + &from_snapshot, > + &to_snapshot, > + downloaded_chunks.clone(), > + ) > + .await; > =20 > progress.done_snapshots =3D pos as u64 + 1; > task_log!(worker, "percentage done: {}", progress); > @@ -703,11 +1025,14 @@ async fn pull_group( > } > =20 > if params.remove_vanished { > - let group =3D params.store.backup_group(target_ns.clone(), group= .clone()); > + let group =3D params > + .target > + .store > + .backup_group(target_ns.clone(), group.clone()); > let local_list =3D group.list_backups()?; > for info in local_list { > let snapshot =3D info.backup_dir; > - if remote_snapshots.contains(&snapshot.backup_time()) { > + if source_snapshots.contains(&snapshot.backup_time()) { > continue; > } > if snapshot.is_protected() { > @@ -720,6 +1045,7 @@ async fn pull_group( > } > task_log!(worker, "delete vanished snapshot {}", snapshot.di= r()); > params > + .target > .store > .remove_backup_dir(&target_ns, snapshot.as_ref(), false)= ?; > } > @@ -732,64 +1058,12 @@ async fn pull_group( > Ok(()) > } > =20 > -// will modify params if switching to backwards mode for lack of NS supp= ort on remote end > -async fn query_namespaces( > - worker: &WorkerTask, > - client: &HttpClient, > - params: &mut PullParameters, > -) -> Result, Error> { > - let path =3D format!( > - "api2/json/admin/datastore/{}/namespace", > - params.source.store() > - ); > - let mut data =3D json!({}); > - if let Some(max_depth) =3D params.max_depth { > - data["max-depth"] =3D json!(max_depth); > - } > - > - if !params.remote_ns.is_root() { > - data["parent"] =3D json!(params.remote_ns); > - } > - > - let mut result =3D match client.get(&path, Some(data)).await { > - Ok(res) =3D> res, > - Err(err) =3D> match err.downcast_ref::() { > - Some(HttpError { code, message }) =3D> match *code { > - StatusCode::NOT_FOUND =3D> { > - if params.remote_ns.is_root() && params.max_depth.is= _none() { > - task_log!(worker, "Could not query remote for na= mespaces (404) -> temporarily switching to backwards-compat mode"); > - task_log!(worker, "Either make backwards-compat = mode explicit (max-depth =3D=3D 0) or upgrade remote system."); > - params.max_depth =3D Some(0); > - } else { > - bail!("Remote namespace set/recursive sync reque= sted, but remote does not support namespaces.") > - } > - > - return Ok(vec![params.remote_ns.clone()]); > - } > - _ =3D> { > - bail!("Querying namespaces failed - HTTP error {code= } - {message}"); > - } > - }, > - None =3D> { > - bail!("Querying namespaces failed - {err}"); > - } > - }, > - }; > - > - let mut list: Vec =3D serde_json::from_value(resu= lt["data"].take())?; > - > - // parents first > - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); > - > - Ok(list.iter().map(|item| item.ns.clone()).collect()) > -} > - > fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) ->= Result { > let mut created =3D false; > - let store_ns_str =3D print_store_and_ns(params.store.name(), ns); > + let store_ns_str =3D print_store_and_ns(params.target.store.name(), = ns); > =20 > - if !ns.is_root() && !params.store.namespace_path(ns).exists() { > - check_ns_modification_privs(params.store.name(), ns, ¶ms.own= er) > + if !ns.is_root() && !params.target.store.namespace_path(ns).exists()= { > + check_ns_modification_privs(params.target.store.name(), ns, &par= ams.owner) > .map_err(|err| format_err!("Creating {ns} not allowed - {err= }"))?; > =20 > let name =3D match ns.components().last() { > @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns= : &BackupNamespace) -> Result< > } > }; > =20 > - if let Err(err) =3D params.store.create_namespace(&ns.parent(), = name) { > + if let Err(err) =3D params.target.store.create_namespace(&ns.par= ent(), name) { > bail!("sync into {store_ns_str} failed - namespace creation = failed: {err}"); > } > created =3D true; > } > =20 > check_ns_privs( > - params.store.name(), > + params.target.store.name(), > ns, > ¶ms.owner, > PRIV_DATASTORE_BACKUP, > @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns= : &BackupNamespace) -> Result< > } > =20 > fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespa= ce) -> Result { > - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.o= wner) > + check_ns_modification_privs(params.target.store.name(), local_ns, &p= arams.owner) > .map_err(|err| format_err!("Removing {local_ns} not allowed - {e= rr}"))?; > =20 > - params.store.remove_namespace_recursive(local_ns, true) > + params > + .target > + .store > + .remove_namespace_recursive(local_ns, true) > } > =20 > fn check_and_remove_vanished_ns( > @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns( > // clamp like remote does so that we don't list more than we can eve= r have synced. > let max_depth =3D params > .max_depth > - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth(= )); > + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().= depth()); > =20 > let mut local_ns_list: Vec =3D params > + .target > .store > - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))= ? > + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_= depth))? > .filter(|ns| { > let user_privs =3D > - user_info.lookup_privs(¶ms.owner, &ns.acl_path(param= s.store.name())); > + user_info.lookup_privs(¶ms.owner, &ns.acl_path(param= s.target.store.name())); > user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) = !=3D 0 > }) > .collect(); > @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns( > local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len(= ))); > =20 > for local_ns in local_ns_list { > - if local_ns =3D=3D params.ns { > + if local_ns =3D=3D params.target.ns { > continue; > } > =20 > @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns( > /// - access to sub-NS checked here > pub(crate) async fn pull_store( > worker: &WorkerTask, > - client: &HttpClient, > mut params: PullParameters, > ) -> Result<(), Error> { > // explicit create shared lock to prevent GC on newly created chunks > - let _shared_store_lock =3D params.store.try_shared_chunk_store_lock(= )?; > + let _shared_store_lock =3D params.target.store.try_shared_chunk_stor= e_lock()?; > let mut errors =3D false; > =20 > let old_max_depth =3D params.max_depth; > - let namespaces =3D if params.remote_ns.is_root() && params.max_depth= =3D=3D Some(0) { > - vec![params.remote_ns.clone()] // backwards compat - don't query= remote namespaces! > - } else { > - query_namespaces(worker, client, &mut params).await? > - }; > + let mut namespaces =3D params > + .source > + .list_namespaces(&mut params.max_depth, worker) > + .await?; > errors |=3D old_max_depth !=3D params.max_depth; // fail job if we s= witched to backwards-compat mode > + namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len())); > =20 > let (mut groups, mut snapshots) =3D (0, 0); > let mut synced_ns =3D HashSet::with_capacity(namespaces.len()); > =20 > for namespace in namespaces { > - let source_store_ns_str =3D print_store_and_ns(params.source.sto= re(), &namespace); > + let source_store_ns_str =3D params.source.print_store_and_ns(); > =20 > - let target_ns =3D namespace.map_prefix(¶ms.remote_ns, ¶m= s.ns)?; > - let target_store_ns_str =3D print_store_and_ns(params.store.name= (), &target_ns); > + let target_ns =3D namespace.map_prefix(¶ms.source.get_ns(), = ¶ms.target.ns)?; > + let target_store_ns_str =3D print_store_and_ns(params.target.sto= re.name(), &target_ns); > =20 > task_log!(worker, "----"); > task_log!( > @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store( > } > } > =20 > - match pull_ns(worker, client, ¶ms, namespace.clone(), target= _ns).await { > + match pull_ns(worker, &namespace, &mut params).await { > Ok((ns_progress, ns_errors)) =3D> { > errors |=3D ns_errors; > =20 > @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store( > task_log!( > worker, > "Encountered errors while syncing namespace {} - {}"= , > - namespace, > + &namespace, > err, > ); > } > @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store( > /// - owner check for vanished groups done here > pub(crate) async fn pull_ns( > worker: &WorkerTask, > - client: &HttpClient, > - params: &PullParameters, > - source_ns: BackupNamespace, > - target_ns: BackupNamespace, > + namespace: &BackupNamespace, > + params: &mut PullParameters, > ) -> Result<(StoreProgress, bool), Error> { > - let path =3D format!("api2/json/admin/datastore/{}/groups", params.s= ource.store()); > - > - let args =3D if !source_ns.is_root() { > - Some(json!({ > - "ns": source_ns, > - })) > - } else { > - None > - }; > - > - let mut result =3D client > - .get(&path, args) > - .await > - .map_err(|err| format_err!("Failed to retrieve backup groups fro= m remote - {}", err))?; > - > - let mut list: Vec =3D serde_json::from_value(result["= data"].take())?; > + let mut list: Vec =3D > + params.source.list_groups(namespace, ¶ms.owner).await?; > =20 > let total_count =3D list.len(); > list.sort_unstable_by(|a, b| { > - let type_order =3D a.backup.ty.cmp(&b.backup.ty); > + let type_order =3D a.ty.cmp(&b.ty); > if type_order =3D=3D std::cmp::Ordering::Equal { > - a.backup.id.cmp(&b.backup.id) > + a.id.cmp(&b.id) > } else { > type_order > } > @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns( > filters.iter().any(|filter| group.matches(filter)) > }; > =20 > - // Get groups with target NS set > - let list: Vec =3D list.into_iter().map(|= item| item.backup).collect(); > - > let list =3D if let Some(ref group_filter) =3D ¶ms.group_filter = { > let unfiltered_count =3D list.len(); > let list: Vec =3D list > @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns( > =20 > let mut progress =3D StoreProgress::new(list.len() as u64); > =20 > + let target_ns =3D params.get_target_ns()?; this is the wrong namespace.. we need to map the source namespace to the ta= rget namespace anchor (e.g., if we are recursively pulling from /a/b to /a/z and= the namespace currently being pulled ist /a/b/c, then target namespace should b= e /a/z/c, not /a/z). > for (done, group) in list.into_iter().enumerate() { > progress.done_groups =3D done as u64; > progress.done_snapshots =3D 0; > @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns( > =20 > let (owner, _lock_guard) =3D > match params > + .target > .store > .create_locked_backup_group(&target_ns, &group, ¶ms.= owner) > { > @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns( > err > ); > errors =3D true; // do not stop here, instead contin= ue > + task_log!(worker, "create_locked_backup_group failed= "); any reason this is here? it's already logged two lines above ;) > continue; > } > }; > @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns( > owner > ); > errors =3D true; // do not stop here, instead continue > - } else if let Err(err) =3D pull_group( > - worker, > - client, > - params, > - &group, > - source_ns.clone(), > - &mut progress, > - ) > - .await > + } else if let Err(err) =3D pull_group(worker, params, namespace,= &group, &mut progress).await > { > task_log!(worker, "sync group {} failed - {}", &group, err,)= ; > errors =3D true; // do not stop here, instead continue > @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns( > =20 > if params.remove_vanished { > let result: Result<(), Error> =3D proxmox_lang::try_block!({ > - for local_group in params.store.iter_backup_groups(target_ns= .clone())? { > + for local_group in params.target.store.iter_backup_groups(ta= rget_ns.clone())? { > let local_group =3D local_group?; > let local_group =3D local_group.group(); > if new_groups.contains(local_group) { > continue; > } > - let owner =3D params.store.get_owner(&target_ns, local_g= roup)?; > + let owner =3D params.target.store.get_owner(&target_ns, = local_group)?; > if check_backup_owner(&owner, ¶ms.owner).is_err() { > continue; > } > @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns( > } > } > task_log!(worker, "delete vanished group '{local_group}'= ",); > - match params.store.remove_backup_group(&target_ns, local= _group) { > + match params > + .target > + .store > + .remove_backup_group(&target_ns, local_group) > + { > Ok(true) =3D> {} > Ok(false) =3D> { > task_log!( > --=20 > 2.30.2 >=20 >=20 >=20 > _______________________________________________ > pbs-devel mailing list > pbs-devel@lists.proxmox.com > https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel >=20 >=20 >=20