From: Lukas Wagner <l.wagner@proxmox.com>
To: Proxmox Backup Server development discussion
<pbs-devel@lists.proxmox.com>,
Hannes Laimer <h.laimer@proxmox.com>
Subject: Re: [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore
Date: Thu, 21 Sep 2023 13:10:34 +0200 [thread overview]
Message-ID: <eb3ae9b4-cd47-4be8-b3b3-880a63189f8c@proxmox.com> (raw)
In-Reply-To: <20230808121344.199500-6-h.laimer@proxmox.com>
Some of the changed lines seem to be overly long (>100 chars), I've
noted some of the places, but probably did not catch everything.
On 8/8/23 14:13, Hannes Laimer wrote:
> ... making the pull logic independent from the actual source
> using two traits.
>
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
> Cargo.toml | 2 +
> pbs-datastore/src/read_chunk.rs | 2 +-
> src/api2/config/remote.rs | 14 +-
> src/api2/pull.rs | 31 +-
> src/server/pull.rs | 943 +++++++++++++++++++-------------
> 5 files changed, 570 insertions(+), 422 deletions(-)
>
> diff --git a/Cargo.toml b/Cargo.toml
> index 4d34f8a1..74cb68e0 100644
> --- a/Cargo.toml
> +++ b/Cargo.toml
> @@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
>
> # regular crates
> anyhow = "1.0"
> +async-trait = "0.1.56"
> apt-pkg-native = "0.3.2"
> base64 = "0.13"
> bitflags = "1.2.1"
> @@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
>
> [dependencies]
> anyhow.workspace = true
> +async-trait.workspace = true
> apt-pkg-native.workspace = true
> base64.workspace = true
> bitflags.workspace = true
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
> fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
> }
>
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
> /// Returns the encoded chunk data
> fn read_raw_chunk<'a>(
> &'a self,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 307cf3cd..2511c5d5 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
> Ok(())
> }
>
> -/// Helper to get client for remote.cfg entry
> -pub async fn remote_client(
> +/// Helper to get client for remote.cfg entry without login, just config
> +pub fn remote_client_config(
> remote: &Remote,
> limit: Option<RateLimitConfig>,
> ) -> Result<HttpClient, Error> {
> @@ -320,6 +320,16 @@ pub async fn remote_client(
> &remote.config.auth_id,
> options,
> )?;
> +
> + Ok(client)
> +}
> +
> +/// Helper to get client for remote.cfg entry
> +pub async fn remote_client(
> + remote: &Remote,
> + limit: Option<RateLimitConfig>,
> +) -> Result<HttpClient, Error> {
> + let client = remote_client_config(remote, limit)?;
> let _auth_info = client
> .login() // make sure we can auth
> .await
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index 664ecce5..e36a5b14 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -8,7 +8,7 @@ use proxmox_sys::task_log;
>
> use pbs_api_types::{
> Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
> - GROUP_FILTER_LIST_SCHEMA, MAX_NAMESPACE_DEPTH, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> + GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
> TRANSFER_LAST_SCHEMA,
> };
> @@ -75,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
> PullParameters::new(
> &sync_job.store,
> sync_job.ns.clone().unwrap_or_default(),
> - sync_job.remote.as_deref().unwrap_or("local"),
> + sync_job.remote.as_deref(),
> &sync_job.remote_store,
> sync_job.remote_ns.clone().unwrap_or_default(),
> sync_job
> @@ -124,7 +124,6 @@ pub fn do_sync_job(
>
> let worker_future = async move {
> let pull_params = PullParameters::try_from(&sync_job)?;
> - let client = pull_params.client().await?;
>
> task_log!(worker, "Starting datastore sync job '{}'", job_id);
> if let Some(event_str) = schedule {
> @@ -138,24 +137,7 @@ pub fn do_sync_job(
> sync_job.remote_store,
> );
>
> - if sync_job.remote.is_some() {
> - pull_store(&worker, &client, pull_params).await?;
> - } else {
> - if let (Some(target_ns), Some(source_ns)) = (sync_job.ns, sync_job.remote_ns) {
> - if target_ns.path().starts_with(source_ns.path())
> - && sync_job.store == sync_job.remote_store
> - && sync_job.max_depth.map_or(true, |sync_depth| {
> - target_ns.depth() + sync_depth > MAX_NAMESPACE_DEPTH
> - }) {
> - task_log!(
> - worker,
> - "Can't sync namespace into one of its sub-namespaces, would exceed maximum namespace depth, skipping"
> - );
> - }
> - } else {
> - pull_store(&worker, &client, pull_params).await?;
> - }
> - }
> + pull_store(&worker, pull_params).await?;
>
> task_log!(worker, "sync job '{}' end", &job_id);
>
> @@ -284,7 +266,7 @@ async fn pull(
> let pull_params = PullParameters::new(
> &store,
> ns,
> - remote.as_deref().unwrap_or("local"),
> + remote.as_deref(),
> &remote_store,
> remote_ns.unwrap_or_default(),
> auth_id.clone(),
> @@ -294,7 +276,6 @@ async fn pull(
> limit,
> transfer_last,
> )?;
> - let client = pull_params.client().await?;
>
> // fixme: set to_stdout to false?
> // FIXME: add namespace to worker id?
> @@ -312,7 +293,7 @@ async fn pull(
> remote_store,
> );
>
> - let pull_future = pull_store(&worker, &client, pull_params);
> + let pull_future = pull_store(&worker, pull_params);
> (select! {
> success = pull_future.fuse() => success,
> abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> @@ -327,4 +308,4 @@ async fn pull(
> Ok(upid_str)
> }
>
> -pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> \ No newline at end of file
> +pub const ROUTER: Router = Router::new().post(&API_METHOD_PULL);
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index e55452d1..e1a27a8c 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
> //! Sync datastore from remote server
>
> use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::Seek;
> +use std::path::Path;
> use std::sync::atomic::{AtomicUsize, Ordering};
> use std::sync::{Arc, Mutex};
> use std::time::SystemTime;
>
> use anyhow::{bail, format_err, Error};
> use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
> use proxmox_router::HttpError;
> -use proxmox_sys::task_log;
> +use proxmox_sys::{task_log, task_warn};
> +use serde_json::json;
>
> use pbs_api_types::{
> - print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> - Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> + print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
> + GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
> PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
> };
> -
> -use pbs_client::{
> - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
> use pbs_datastore::data_blob::DataBlob;
> use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
> use pbs_datastore::manifest::{
> archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
> };
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>
> use crate::backup::{check_ns_modification_privs, check_ns_privs};
> use crate::tools::parallel_handler::ParallelHandler;
>
> -/// Parameters for a pull operation.
> -pub(crate) struct PullParameters {
> - /// Remote that is pulled from
> - remote: Remote,
> - /// Full specification of remote datastore
> - source: BackupRepository,
> - /// Local store that is pulled into
> +struct RemoteReader {
> + backup_reader: Arc<BackupReader>,
> + dir: BackupDir,
> +}
> +
> +pub(crate) struct PullTarget {
> store: Arc<DataStore>,
> - /// Remote namespace
> - remote_ns: BackupNamespace,
> - /// Local namespace (anchor)
> ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> + repo: BackupRepository,
> + ns: BackupNamespace,
> + client: HttpClient,
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
> +/// The trait includes methods for listing namespaces, groups, and backup directories,
> +/// as well as retrieving a reader for reading data from the source
> +trait PullSource: Send + Sync {
> + /// Lists namespaces from the source.
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error>;
> +
> + /// Lists groups within a specific namespace from the source.
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error>;
> +
> + /// Lists backup directories for a specific group within a specific namespace from the source.
> + async fn list_backup_dirs(
> + &self,
> + namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error>;
> + fn get_ns(&self) -> BackupNamespace;
> + fn print_store_and_ns(&self) -> String;
> +
> + /// Returns a reader for reading data from a specific backup directory.
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error>;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullSource for RemoteSource {
> + async fn list_namespaces(
> + &self,
> + max_depth: &mut Option<usize>,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupNamespace>, Error> {
> + if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> + vec![self.ns.clone()];
> + }
> +
> + let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
> + let mut data = json!({});
> + if let Some(max_depth) = max_depth {
> + data["max-depth"] = json!(max_depth);
> + }
> +
> + if !self.ns.is_root() {
> + data["parent"] = json!(self.ns);
> + }
> + self.client.login().await?;
> +
> + let mut result = match self.client.get(&path, Some(data)).await {
> + Ok(res) => res,
> + Err(err) => match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match code {
> + &StatusCode::NOT_FOUND => {
> + if self.ns.is_root() && max_depth.is_none() {
> + task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> + task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
These lines exceed our 100 character limit.
> + max_depth.replace(0);
> + } else {
> + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> + }
> +
> + return Ok(vec![self.ns.clone()]);
> + }
> + _ => {
> + bail!("Querying namespaces failed - HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + bail!("Querying namespaces failed - {err}");
> + }
> + },
> + };
> +
> + let list: Vec<BackupNamespace> =
> + serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> + .iter()
> + .map(|list_item| list_item.ns.clone())
> + .collect();
> +
> + Ok(list)
> + }
> +
> + async fn list_groups(
> + &self,
> + namespace: &BackupNamespace,
> + _owner: &Authid,
> + ) -> Result<Vec<BackupGroup>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
> +
> + let args = if !namespace.is_root() {
> + Some(json!({ "ns": namespace.clone() }))
> + } else {
> + None
> + };
> +
> + self.client.login().await?;
> + let mut result =
> + self.client.get(&path, args).await.map_err(|err| {
> + format_err!("Failed to retrieve backup groups from remote - {}", err)
> + })?;
> +
> + Ok(
> + serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> + .map_err(Error::from)?
> + .into_iter()
> + .map(|item| item.backup)
> + .collect::<Vec<BackupGroup>>(),
> + )
> + }
> +
> + async fn list_backup_dirs(
> + &self,
> + _namespace: &BackupNamespace,
> + group: &BackupGroup,
> + worker: &WorkerTask,
> + ) -> Result<Vec<BackupDir>, Error> {
> + let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
> +
> + let mut args = json!({
> + "backup-type": group.ty,
> + "backup-id": group.id,
> + });
> +
> + if !self.ns.is_root() {
> + args["ns"] = serde_json::to_value(&self.ns)?;
> + }
> +
> + self.client.login().await?;
> +
> + let mut result = self.client.get(&path, Some(args)).await?;
> + let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> + Ok(snapshot_list
> + .into_iter()
> + .filter_map(|item: SnapshotListItem| {
> + let snapshot = item.backup;
> + // in-progress backups can't be synced
> + if item.size.is_none() {
> + task_log!(
> + worker,
> + "skipping snapshot {} - in-progress backup",
> + snapshot
> + );
> + return None;
> + }
> +
> + Some(snapshot)
> + })
> + .collect::<Vec<BackupDir>>())
> + }
> +
> + fn get_ns(&self) -> BackupNamespace {
> + self.ns.clone()
> + }
> +
> + fn print_store_and_ns(&self) -> String {
> + print_store_and_ns(self.repo.store(), &self.ns)
> + }
> +
> + async fn reader(
> + &self,
> + ns: &BackupNamespace,
> + dir: &BackupDir,
> + ) -> Result<Arc<dyn PullReader>, Error> {
> + let backup_reader =
> + BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
> + Ok(Arc::new(RemoteReader {
> + backup_reader,
> + dir: dir.clone(),
> + }))
> + }
> +}
> +
> +#[async_trait::async_trait]
> +/// `PullReader` is a trait that provides an interface for reading data from a source.
> +/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
Long line again
> +trait PullReader: Send + Sync {
> + /// Returns a chunk reader with the specified encryption mode.
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
> +
> + /// Asynchronously loads a file from the source into a local file.
> + /// `filename` is the name of the file to load from the source.
> + /// `into` is the path of the local file to load the source file into.
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error>;
> +
> + /// Tries to download the client log from the source and save it into a local file.
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error>;
> +
> + fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
> +}
> +
> +#[async_trait::async_trait]
> +impl PullReader for RemoteReader {
> + fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
> + Arc::new(RemoteChunkReader::new(
> + self.backup_reader.clone(),
> + None,
> + crypt_mode,
> + HashMap::new(),
> + ))
> + }
> +
> + async fn load_file_into(
> + &self,
> + filename: &str,
> + into: &Path,
> + worker: &WorkerTask,
> + ) -> Result<Option<DataBlob>, Error> {
> + let mut tmp_file = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .truncate(true)
> + .read(true)
> + .open(into)?;
> + let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
> + if let Err(err) = download_result {
> + match err.downcast_ref::<HttpError>() {
> + Some(HttpError { code, message }) => match *code {
> + StatusCode::NOT_FOUND => {
> + task_log!(
> + worker,
> + "skipping snapshot {} - vanished since start of sync",
> + &self.dir,
> + );
> + return Ok(None);
> + }
> + _ => {
> + bail!("HTTP error {code} - {message}");
> + }
> + },
> + None => {
> + return Err(err);
> + }
> + };
> + };
> + tmp_file.rewind()?;
> + Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
> + }
> +
> + async fn try_download_client_log(
> + &self,
> + to_path: &Path,
> + worker: &WorkerTask,
> + ) -> Result<(), Error> {
> + let mut tmp_path = to_path.to_owned();
> + tmp_path.set_extension("tmp");
> +
> + let tmpfile = std::fs::OpenOptions::new()
> + .write(true)
> + .create(true)
> + .read(true)
> + .open(&tmp_path)?;
> +
> + // Note: be silent if there is no log - only log successful download
> + if let Ok(()) = self
> + .backup_reader
> + .download(CLIENT_LOG_BLOB_NAME, tmpfile)
> + .await
> + {
> + if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> + bail!("Atomic rename file {:?} failed - {}", to_path, err);
> + }
> + task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> + }
> +
> + Ok(())
> + }
> +
> + fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
> + false
> + }
> +}
> +
> +/// Parameters for a pull operation.
> +pub(crate) struct PullParameters {
> + /// Where data is pulled from
> + source: Arc<dyn PullSource>,
> + /// Where data should be pulled into
> + target: PullTarget,
> /// Owner of synced groups (needs to match local owner of pre-existing groups)
> owner: Authid,
> /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
> max_depth: Option<usize>,
> /// Filters for reducing the pull scope
> group_filter: Option<Vec<GroupFilter>>,
> - /// Rate limits for all transfers from `remote`
> - limit: RateLimitConfig,
> /// How many snapshots should be transferred at most (taking the newest N snapshots)
> transfer_last: Option<usize>,
> }
>
> impl PullParameters {
> /// Creates a new instance of `PullParameters`.
> - ///
> - /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> - /// [BackupRepository] with `remote_store`.
> - #[allow(clippy::too_many_arguments)]
> pub(crate) fn new(
> store: &str,
> ns: BackupNamespace,
> - remote: &str,
> + remote: Option<&str>,
> remote_store: &str,
> remote_ns: BackupNamespace,
> owner: Authid,
> @@ -82,49 +376,56 @@ impl PullParameters {
> limit: RateLimitConfig,
> transfer_last: Option<usize>,
> ) -> Result<Self, Error> {
> - let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
> if let Some(max_depth) = max_depth {
> ns.check_max_depth(max_depth)?;
> remote_ns.check_max_depth(max_depth)?;
> - }
> -
> - let (remote_config, _digest) = pbs_config::remote::config()?;
> - let remote: Remote = remote_config.lookup("remote", remote)?;
> -
> + };
> let remove_vanished = remove_vanished.unwrap_or(false);
>
> - let source = BackupRepository::new(
> - Some(remote.config.auth_id.clone()),
> - Some(remote.config.host.clone()),
> - remote.config.port,
> - remote_store.to_string(),
> - );
> + let source: Arc<dyn PullSource> = if let Some(remote) = remote {
> + let (remote_config, _digest) = pbs_config::remote::config()?;
> + let remote: Remote = remote_config.lookup("remote", remote)?;
>
> - Ok(Self {
> - remote,
> - remote_ns,
> + let repo = BackupRepository::new(
> + Some(remote.config.auth_id.clone()),
> + Some(remote.config.host.clone()),
> + remote.config.port,
> + remote_store.to_string(),
> + );
> + let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
> + Arc::new(RemoteSource {
> + repo,
> + ns: remote_ns,
> + client,
> + })
> + } else {
> + bail!("local sync not implemented yet")
> + };
> + let target = PullTarget {
> + store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> ns,
> + };
> +
> + Ok(Self {
> source,
> - store,
> + target,
> owner,
> remove_vanished,
> max_depth,
> group_filter,
> - limit,
> transfer_last,
> })
> }
>
> - /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> - pub async fn client(&self) -> Result<HttpClient, Error> {
> - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> + pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> + let source_ns = self.source.get_ns();
> + source_ns.map_prefix(&source_ns, &self.target.ns)
> }
> }
>
> async fn pull_index_chunks<I: IndexFile>(
> worker: &WorkerTask,
> - chunk_reader: RemoteChunkReader,
> + chunk_reader: Arc<dyn AsyncReadChunk>,
> target: Arc<DataStore>,
> index: I,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -215,26 +516,6 @@ async fn pull_index_chunks<I: IndexFile>(
> Ok(())
> }
>
> -async fn download_manifest(
> - reader: &BackupReader,
> - filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> - let mut tmp_manifest_file = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .truncate(true)
> - .read(true)
> - .open(filename)?;
> -
> - reader
> - .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> - .await?;
> -
> - tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> - Ok(tmp_manifest_file)
> -}
> -
> fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
> if size != info.size {
> bail!(
> @@ -255,17 +536,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
> /// Pulls a single file referenced by a manifest.
> ///
> /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> -/// - Verify tmp file checksum
> +/// - Load archive file into tmp file
> +/// -- Load file into tmp file
> +/// -- Verify tmp file checksum
> /// - if archive is an index, pull referenced chunks
> /// - Rename tmp file into real path
> -async fn pull_single_archive(
> - worker: &WorkerTask,
> - reader: &BackupReader,
> - chunk_reader: &mut RemoteChunkReader,
> - snapshot: &pbs_datastore::BackupDir,
> - archive_info: &FileInfo,
> +async fn pull_single_archive<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> + archive_info: &'a FileInfo,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let archive_name = &archive_info.filename;
> @@ -277,13 +557,11 @@ async fn pull_single_archive(
>
> task_log!(worker, "sync archive {}", archive_name);
>
> - let mut tmpfile = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .read(true)
> - .open(&tmp_path)?;
> + reader
> + .load_file_into(archive_name, &tmp_path, worker)
> + .await?;
>
> - reader.download(archive_name, &mut tmpfile).await?;
> + let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>
> match archive_type(archive_name)? {
> ArchiveType::DynamicIndex => {
> @@ -293,14 +571,18 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::FixedIndex => {
> let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -309,17 +591,21 @@ async fn pull_single_archive(
> let (csum, size) = index.compute_csum();
> verify_archive(archive_info, &csum, size)?;
>
> - pull_index_chunks(
> - worker,
> - chunk_reader.clone(),
> - snapshot.datastore().clone(),
> - index,
> - downloaded_chunks,
> - )
> - .await?;
> + if reader.skip_chunk_sync(snapshot.datastore().name()) {
> + task_log!(worker, "skipping chunk sync for same datatsore");
> + } else {
> + pull_index_chunks(
> + worker,
> + reader.chunk_reader(archive_info.crypt_mode),
> + snapshot.datastore().clone(),
> + index,
> + downloaded_chunks,
> + )
> + .await?;
> + }
> }
> ArchiveType::Blob => {
> - tmpfile.seek(SeekFrom::Start(0))?;
> + tmpfile.rewind()?;
> let (csum, size) = sha256(&mut tmpfile)?;
> verify_archive(archive_info, &csum, size)?;
> }
> @@ -330,33 +616,6 @@ async fn pull_single_archive(
> Ok(())
> }
>
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - path: &std::path::Path,
> -) -> Result<(), Error> {
> - let mut tmp_path = path.to_owned();
> - tmp_path.set_extension("tmp");
> -
> - let tmpfile = std::fs::OpenOptions::new()
> - .write(true)
> - .create(true)
> - .read(true)
> - .open(&tmp_path)?;
> -
> - // Note: be silent if there is no log - only log successful download
> - if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> - if let Err(err) = std::fs::rename(&tmp_path, path) {
> - bail!("Atomic rename file {:?} failed - {}", path, err);
> - }
> - task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> - }
> -
> - Ok(())
> -}
> -
> /// Actual implementation of pulling a snapshot.
> ///
> /// Pulling a snapshot consists of the following steps:
> @@ -366,10 +625,10 @@ async fn try_client_log_download(
> /// -- if file already exists, verify contents
> /// -- if not, pull it from the remote
> /// - Download log if not already existing
> -async fn pull_snapshot(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - snapshot: &pbs_datastore::BackupDir,
> +async fn pull_snapshot<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let mut manifest_name = snapshot.full_path();
> @@ -380,32 +639,15 @@ async fn pull_snapshot(
>
> let mut tmp_manifest_name = manifest_name.clone();
> tmp_manifest_name.set_extension("tmp");
> -
> - let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> - let mut tmp_manifest_file = match download_res {
> - Ok(manifest_file) => manifest_file,
> - Err(err) => {
> - match err.downcast_ref::<HttpError>() {
> - Some(HttpError { code, message }) => match *code {
> - StatusCode::NOT_FOUND => {
> - task_log!(
> - worker,
> - "skipping snapshot {} - vanished since start of sync",
> - snapshot.dir(),
> - );
> - return Ok(());
> - }
> - _ => {
> - bail!("HTTP error {code} - {message}");
> - }
> - },
> - None => {
> - return Err(err);
> - }
> - };
> - }
> - };
> - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> + let tmp_manifest_blob;
> + if let Some(data) = reader
> + .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
> + .await?
> + {
> + tmp_manifest_blob = data;
> + } else {
> + return Ok(());
> + }
>
> if manifest_name.exists() {
> let manifest_blob = proxmox_lang::try_block!({
> @@ -422,8 +664,10 @@ async fn pull_snapshot(
>
> if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
> if !client_log_name.exists() {
> - try_client_log_download(worker, reader, &client_log_name).await?;
> - }
> + reader
> + .try_download_client_log(&client_log_name, worker)
> + .await?;
> + };
> task_log!(worker, "no data changes");
> let _ = std::fs::remove_file(&tmp_manifest_name);
> return Ok(()); // nothing changed
> @@ -471,17 +715,9 @@ async fn pull_snapshot(
> }
> }
>
> - let mut chunk_reader = RemoteChunkReader::new(
> - reader.clone(),
> - None,
> - item.chunk_crypt_mode(),
> - HashMap::new(),
> - );
> -
> pull_single_archive(
> worker,
> - &reader,
> - &mut chunk_reader,
> + reader.clone(),
> snapshot,
> item,
> downloaded_chunks.clone(),
> @@ -494,9 +730,10 @@ async fn pull_snapshot(
> }
>
> if !client_log_name.exists() {
> - try_client_log_download(worker, reader, &client_log_name).await?;
> - }
> -
> + reader
> + .try_download_client_log(&client_log_name, worker)
> + .await?;
> + };
> snapshot
> .cleanup_unreferenced_files(&manifest)
> .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
> @@ -506,12 +743,12 @@ async fn pull_snapshot(
>
> /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> ///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
> -async fn pull_snapshot_from(
> - worker: &WorkerTask,
> - reader: Arc<BackupReader>,
> - snapshot: &pbs_datastore::BackupDir,
> +/// The `reader` is configured to read from the source backup directory, while the
> +/// `snapshot` is pointing to the local datastore and target namespace.
> +async fn pull_snapshot_from<'a>(
> + worker: &'a WorkerTask,
> + reader: Arc<dyn PullReader + 'a>,
> + snapshot: &'a pbs_datastore::BackupDir,
> downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> ) -> Result<(), Error> {
> let (_path, is_new, _snap_lock) = snapshot
> @@ -626,11 +863,10 @@ impl std::fmt::Display for SkipInfo {
> /// - Sort by snapshot time
> /// - Get last snapshot timestamp on local datastore
> /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
> /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
> /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
> ///
> -/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
> +/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
> /// remote when querying snapshots. This allows us to interact with old remotes that don't have
> /// namespace support yet.
> ///
> @@ -639,117 +875,79 @@ impl std::fmt::Display for SkipInfo {
> /// - local group owner is already checked by pull_store
> async fn pull_group(
> worker: &WorkerTask,
> - client: &HttpClient,
> params: &PullParameters,
> - group: &pbs_api_types::BackupGroup,
> - remote_ns: BackupNamespace,
> + source_namespace: &BackupNamespace,
> + group: &BackupGroup,
> progress: &mut StoreProgress,
> ) -> Result<(), Error> {
> - task_log!(worker, "sync group {}", group);
> -
> - let path = format!(
> - "api2/json/admin/datastore/{}/snapshots",
> - params.source.store()
> - );
> -
> - let mut args = json!({
> - "backup-type": group.ty,
> - "backup-id": group.id,
> - });
> -
> - if !remote_ns.is_root() {
> - args["ns"] = serde_json::to_value(&remote_ns)?;
> - }
> -
> - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
> -
> - let mut result = client.get(&path, Some(args)).await?;
> - let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> - client.login().await?; // make sure auth is complete
> -
> - let fingerprint = client.fingerprint();
> -
> - let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> - let last_sync_time = last_sync.unwrap_or(i64::MIN);
> -
> - let mut remote_snapshots = std::collections::HashSet::new();
> -
> - // start with 65536 chunks (up to 256 GiB)
> - let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> - progress.group_snapshots = list.len() as u64;
> -
> let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
> let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
>
> - let total_amount = list.len();
> + let mut raw_list: Vec<BackupDir> = params
> + .source
> + .list_backup_dirs(source_namespace, group, worker)
> + .await?;
> + raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
> +
> + let total_amount = raw_list.len();
>
> let cutoff = params
> .transfer_last
> .map(|count| total_amount.saturating_sub(count))
> .unwrap_or_default();
>
> - for (pos, item) in list.into_iter().enumerate() {
> - let snapshot = item.backup;
> -
> - // in-progress backups can't be synced
> - if item.size.is_none() {
> - task_log!(
> - worker,
> - "skipping snapshot {} - in-progress backup",
> - snapshot
> - );
> - continue;
> - }
> -
> - remote_snapshots.insert(snapshot.time);
> + let target_ns = params.get_target_ns()?;
>
> - if last_sync_time > snapshot.time {
> - already_synced_skip_info.update(snapshot.time);
> - continue;
> - } else if already_synced_skip_info.count > 0 {
> - task_log!(worker, "{}", already_synced_skip_info);
> - already_synced_skip_info.reset();
> - }
> -
> - if pos < cutoff && last_sync_time != snapshot.time {
> - transfer_last_skip_info.update(snapshot.time);
> - continue;
> - } else if transfer_last_skip_info.count > 0 {
> - task_log!(worker, "{}", transfer_last_skip_info);
> - transfer_last_skip_info.reset();
> - }
> -
> - // get updated auth_info (new tickets)
> - let auth_info = client.login().await?;
> + let mut source_snapshots = HashSet::new();
> + let last_sync_time = params
> + .target
> + .store
> + .last_successful_backup(&target_ns, group)?
> + .unwrap_or(i64::MIN);
> +
> + let list: Vec<BackupDir> = raw_list
> + .into_iter()
> + .enumerate()
> + .filter(|&(pos, ref dir)| {
> + source_snapshots.insert(dir.time);
> + if last_sync_time > dir.time {
> + already_synced_skip_info.update(dir.time);
> + return false;
> + } else if already_synced_skip_info.count > 0 {
> + task_log!(worker, "{}", already_synced_skip_info);
> + already_synced_skip_info.reset();
> + return true;
> + }
>
> - let options =
> - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> - .rate_limit(params.limit.clone());
> + if pos < cutoff && last_sync_time != dir.time {
> + transfer_last_skip_info.update(dir.time);
> + return false;
> + } else if transfer_last_skip_info.count > 0 {
> + task_log!(worker, "{}", transfer_last_skip_info);
> + transfer_last_skip_info.reset();
> + }
> + true
> + })
> + .map(|(_, dir)| dir)
> + .collect();
>
> - let new_client = HttpClient::new(
> - params.source.host(),
> - params.source.port(),
> - params.source.auth_id(),
> - options,
> - )?;
> + // start with 65536 chunks (up to 256 GiB)
> + let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>
> - let reader = BackupReader::start(
> - &new_client,
> - None,
> - params.source.store(),
> - &remote_ns,
> - &snapshot,
> - true,
> - )
> - .await?;
> + progress.group_snapshots = list.len() as u64;
>
> - let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> + for (pos, from_snapshot) in list.into_iter().enumerate() {
> + let to_snapshot = params
> + .target
> + .store
> + .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>
> - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> + let reader = params
> + .source
> + .reader(source_namespace, &from_snapshot)
> + .await?;
> + let result =
> + pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
>
> progress.done_snapshots = pos as u64 + 1;
> task_log!(worker, "percentage done: {}", progress);
> @@ -758,11 +956,14 @@ async fn pull_group(
> }
>
> if params.remove_vanished {
> - let group = params.store.backup_group(target_ns.clone(), group.clone());
> + let group = params
> + .target
> + .store
> + .backup_group(params.get_target_ns()?, group.clone());
> let local_list = group.list_backups()?;
> for info in local_list {
> let snapshot = info.backup_dir;
> - if remote_snapshots.contains(&snapshot.backup_time()) {
> + if source_snapshots.contains(&snapshot.backup_time()) {
> continue;
> }
> if snapshot.is_protected() {
> @@ -774,73 +975,23 @@ async fn pull_group(
> continue;
> }
> task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
> - params
> - .store
> - .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
> + params.target.store.remove_backup_dir(
> + ¶ms.get_target_ns()?,
> + snapshot.as_ref(),
> + false,
> + )?;
> }
> }
>
> Ok(())
> }
>
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> - worker: &WorkerTask,
> - client: &HttpClient,
> - params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> - let path = format!(
> - "api2/json/admin/datastore/{}/namespace",
> - params.source.store()
> - );
> - let mut data = json!({});
> - if let Some(max_depth) = params.max_depth {
> - data["max-depth"] = json!(max_depth);
> - }
> -
> - if !params.remote_ns.is_root() {
> - data["parent"] = json!(params.remote_ns);
> - }
> -
> - let mut result = match client.get(&path, Some(data)).await {
> - Ok(res) => res,
> - Err(err) => match err.downcast_ref::<HttpError>() {
> - Some(HttpError { code, message }) => match *code {
> - StatusCode::NOT_FOUND => {
> - if params.remote_ns.is_root() && params.max_depth.is_none() {
> - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> - params.max_depth = Some(0);
> - } else {
> - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> - }
> -
> - return Ok(vec![params.remote_ns.clone()]);
> - }
> - _ => {
> - bail!("Querying namespaces failed - HTTP error {code} - {message}");
> - }
> - },
> - None => {
> - bail!("Querying namespaces failed - {err}");
> - }
> - },
> - };
> -
> - let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> - // parents first
> - list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> - Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
> fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
> let mut created = false;
> - let store_ns_str = print_store_and_ns(params.store.name(), ns);
> + let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>
> - if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> - check_ns_modification_privs(params.store.name(), ns, ¶ms.owner)
> + if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> + check_ns_modification_privs(params.target.store.name(), ns, ¶ms.owner)
> .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>
> let name = match ns.components().last() {
> @@ -850,14 +1001,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
> }
> };
>
> - if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> + if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
> bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
> }
> created = true;
> }
>
> check_ns_privs(
> - params.store.name(),
> + params.target.store.name(),
> ns,
> ¶ms.owner,
> PRIV_DATASTORE_BACKUP,
> @@ -868,10 +1019,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
> }
>
> fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> - check_ns_modification_privs(params.store.name(), local_ns, ¶ms.owner)
> + check_ns_modification_privs(params.target.store.name(), local_ns, ¶ms.owner)
> .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>
> - params.store.remove_namespace_recursive(local_ns, true)
> + params
> + .target
> + .store
> + .remove_namespace_recursive(local_ns, true)
> }
>
> fn check_and_remove_vanished_ns(
> @@ -885,14 +1039,15 @@ fn check_and_remove_vanished_ns(
> // clamp like remote does so that we don't list more than we can ever have synced.
> let max_depth = params
> .max_depth
> - .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> + .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>
> let mut local_ns_list: Vec<BackupNamespace> = params
> + .target
> .store
> - .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> + .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
> .filter(|ns| {
> let user_privs =
> - user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.store.name()));
> + user_info.lookup_privs(¶ms.owner, &ns.acl_path(params.target.store.name()));
> user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
> })
> .collect();
> @@ -901,7 +1056,7 @@ fn check_and_remove_vanished_ns(
> local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>
> for local_ns in local_ns_list {
> - if local_ns == params.ns {
> + if local_ns == params.target.ns {
> continue;
> }
>
> @@ -948,29 +1103,49 @@ fn check_and_remove_vanished_ns(
> /// - access to sub-NS checked here
> pub(crate) async fn pull_store(
> worker: &WorkerTask,
> - client: &HttpClient,
> mut params: PullParameters,
> ) -> Result<(), Error> {
> // explicit create shared lock to prevent GC on newly created chunks
> - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> + let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
> let mut errors = false;
>
> let old_max_depth = params.max_depth;
> - let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> - vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> + let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
> + vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
> } else {
> - query_namespaces(worker, client, &mut params).await?
> + params
> + .source
> + .list_namespaces(&mut params.max_depth, worker)
> + .await?
> };
> +
> + let ns_layers_to_be_pulled = namespaces
> + .iter()
> + .map(BackupNamespace::depth)
> + .max()
> + .map_or(0, |v| v - params.source.get_ns().depth());
> + let target_depth = params.target.ns.depth();
> +
> + if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
> + bail!(
> + "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
> + ns_layers_to_be_pulled,
> + target_depth,
> + MAX_NAMESPACE_DEPTH
> + );
> + }
> +
> errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> + namespaces.sort_unstable_by_key(|a| a.name_len());
>
> let (mut groups, mut snapshots) = (0, 0);
> let mut synced_ns = HashSet::with_capacity(namespaces.len());
>
> for namespace in namespaces {
> - let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> + let source_store_ns_str = params.source.print_store_and_ns();
>
> - let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?;
> - let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> + let target_ns = namespace.map_prefix(¶ms.source.get_ns(), ¶ms.target.ns)?;
> + let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>
> task_log!(worker, "----");
> task_log!(
> @@ -998,7 +1173,7 @@ pub(crate) async fn pull_store(
> }
> }
>
> - match pull_ns(worker, client, ¶ms, namespace.clone(), target_ns).await {
> + match pull_ns(worker, &namespace, &mut params).await {
> Ok((ns_progress, ns_errors)) => {
> errors |= ns_errors;
>
> @@ -1019,7 +1194,7 @@ pub(crate) async fn pull_store(
> task_log!(
> worker,
> "Encountered errors while syncing namespace {} - {}",
> - namespace,
> + &namespace,
> err,
> );
> }
> @@ -1051,48 +1226,28 @@ pub(crate) async fn pull_store(
> /// - owner check for vanished groups done here
> pub(crate) async fn pull_ns(
> worker: &WorkerTask,
> - client: &HttpClient,
> - params: &PullParameters,
> - source_ns: BackupNamespace,
> - target_ns: BackupNamespace,
> + namespace: &BackupNamespace,
> + params: &mut PullParameters,
> ) -> Result<(StoreProgress, bool), Error> {
> - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> - let args = if !source_ns.is_root() {
> - Some(json!({
> - "ns": source_ns,
> - }))
> - } else {
> - None
> - };
> -
> - let mut result = client
> - .get(&path, args)
> - .await
> - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> - let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> + let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, ¶ms.owner).await?;
>
> let total_count = list.len();
> list.sort_unstable_by(|a, b| {
> - let type_order = a.backup.ty.cmp(&b.backup.ty);
> + let type_order = a.ty.cmp(&b.ty);
> if type_order == std::cmp::Ordering::Equal {
> - a.backup.id.cmp(&b.backup.id)
> + a.id.cmp(&b.id)
> } else {
> type_order
> }
> });
>
> - let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
> + let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
> filters.iter().any(|filter| group.matches(filter))
> };
>
> - // Get groups with target NS set
> - let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
> let list = if let Some(ref group_filter) = ¶ms.group_filter {
> let unfiltered_count = list.len();
> - let list: Vec<pbs_api_types::BackupGroup> = list
> + let list: Vec<BackupGroup> = list
> .into_iter()
> .filter(|group| apply_filters(group, group_filter))
> .collect();
> @@ -1110,13 +1265,14 @@ pub(crate) async fn pull_ns(
>
> let mut errors = false;
>
> - let mut new_groups = std::collections::HashSet::new();
> + let mut new_groups = HashSet::new();
> for group in list.iter() {
> new_groups.insert(group.clone());
> }
>
> let mut progress = StoreProgress::new(list.len() as u64);
>
> + let target_ns = params.get_target_ns()?;
> for (done, group) in list.into_iter().enumerate() {
> progress.done_groups = done as u64;
> progress.done_snapshots = 0;
> @@ -1124,6 +1280,7 @@ pub(crate) async fn pull_ns(
>
> let (owner, _lock_guard) =
> match params
> + .target
> .store
> .create_locked_backup_group(&target_ns, &group, ¶ms.owner)
> {
> @@ -1135,7 +1292,9 @@ pub(crate) async fn pull_ns(
> &group,
> err
> );
> - errors = true; // do not stop here, instead continue
> + errors = true;
> + // do not stop here, instead continue
> + task_log!(worker, "create_locked_backup_group failed");
> continue;
> }
> };
> @@ -1151,15 +1310,7 @@ pub(crate) async fn pull_ns(
> owner
> );
> errors = true; // do not stop here, instead continue
> - } else if let Err(err) = pull_group(
> - worker,
> - client,
> - params,
> - &group,
> - source_ns.clone(),
> - &mut progress,
> - )
> - .await
> + } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
> {
> task_log!(worker, "sync group {} failed - {}", &group, err,);
> errors = true; // do not stop here, instead continue
> @@ -1168,13 +1319,13 @@ pub(crate) async fn pull_ns(
>
> if params.remove_vanished {
> let result: Result<(), Error> = proxmox_lang::try_block!({
> - for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> + for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
> let local_group = local_group?;
> let local_group = local_group.group();
> if new_groups.contains(local_group) {
> continue;
> }
> - let owner = params.store.get_owner(&target_ns, local_group)?;
> + let owner = params.target.store.get_owner(&target_ns, local_group)?;
> if check_backup_owner(&owner, ¶ms.owner).is_err() {
> continue;
> }
> @@ -1184,7 +1335,11 @@ pub(crate) async fn pull_ns(
> }
> }
> task_log!(worker, "delete vanished group '{local_group}'",);
> - match params.store.remove_backup_group(&target_ns, local_group) {
> + match params
> + .target
> + .store
> + .remove_backup_group(&target_ns, local_group)
> + {
> Ok(true) => {}
> Ok(false) => {
> task_log!(
--
- Lukas
next prev parent reply other threads:[~2023-09-21 11:11 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-08-08 12:13 [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 1/6] api2: make Remote for SyncJob optional Hannes Laimer
2023-08-23 11:37 ` Wolfgang Bumiller
2023-09-21 11:06 ` Lukas Wagner
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 2/6] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 3/6] manager: add completion for opt. " Hannes Laimer
2023-08-24 9:24 ` Wolfgang Bumiller
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 4/6] accept a ref to a HttpClient Hannes Laimer
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 5/6] pull: refactor pulling from a datastore Hannes Laimer
2023-08-24 13:09 ` Wolfgang Bumiller
2023-09-21 11:10 ` Lukas Wagner [this message]
2023-08-08 12:13 ` [pbs-devel] [PATCH proxmox-backup v3 6/6] pull: add support for pulling from local datastore Hannes Laimer
2023-09-21 10:01 ` [pbs-devel] [PATCH proxmox-backup v3 0/6] local sync-jobs Lukas Wagner
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=eb3ae9b4-cd47-4be8-b3b3-880a63189f8c@proxmox.com \
--to=l.wagner@proxmox.com \
--cc=h.laimer@proxmox.com \
--cc=pbs-devel@lists.proxmox.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal