From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from firstgate.proxmox.com (firstgate.proxmox.com [212.224.123.68]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by lists.proxmox.com (Postfix) with ESMTPS id D1A5792831 for ; Mon, 13 Feb 2023 16:46:41 +0100 (CET) Received: from firstgate.proxmox.com (localhost [127.0.0.1]) by firstgate.proxmox.com (Proxmox) with ESMTP id 7D4B626161 for ; Mon, 13 Feb 2023 16:46:11 +0100 (CET) Received: from proxmox-new.maurer-it.com (proxmox-new.maurer-it.com [94.136.29.106]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits)) (No client certificate requested) by firstgate.proxmox.com (Proxmox) with ESMTPS for ; Mon, 13 Feb 2023 16:46:07 +0100 (CET) Received: from proxmox-new.maurer-it.com (localhost.localdomain [127.0.0.1]) by proxmox-new.maurer-it.com (Proxmox) with ESMTP id 9167246D91 for ; Mon, 13 Feb 2023 16:46:05 +0100 (CET) From: Hannes Laimer To: pbs-devel@lists.proxmox.com Date: Mon, 13 Feb 2023 16:45:53 +0100 Message-Id: <20230213154555.49610-3-h.laimer@proxmox.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20230213154555.49610-1-h.laimer@proxmox.com> References: <20230213154555.49610-1-h.laimer@proxmox.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-SPAM-LEVEL: Spam detection results: 0 AWL 0.033 Adjusted score from AWL reputation of From: address BAYES_00 -1.9 Bayes spam probability is 0 to 1% KAM_DMARC_STATUS 0.01 Test Rule for DKIM or SPF Failure with Strict Alignment SPF_HELO_NONE 0.001 SPF: HELO does not publish an SPF Record SPF_PASS -0.001 SPF: sender matches SPF record Subject: [pbs-devel] [PATCH proxmox-backup 2/4] pull: add logic for local pull X-BeenThere: pbs-devel@lists.proxmox.com X-Mailman-Version: 2.1.29 Precedence: list List-Id: Proxmox Backup Server development discussion List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Mon, 13 Feb 2023 15:46:41 -0000 ... since creating a HttpClient(which would be needed to reuse existing pull logic) without a remote was not possible. This also improves the speed for local sync-jobs. Signed-off-by: Hannes Laimer --- pbs-client/src/backup_reader.rs | 5 + src/api2/admin/datastore.rs | 10 + src/server/pull.rs | 499 ++++++++++++++++++++------------ 3 files changed, 335 insertions(+), 179 deletions(-) diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs index 2cd4dc27..9dacef74 100644 --- a/pbs-client/src/backup_reader.rs +++ b/pbs-client/src/backup_reader.rs @@ -20,6 +20,11 @@ use pbs_tools::sha::sha256; use super::{H2Client, HttpClient}; +pub enum BackupSource { + Remote(Arc), + Local(pbs_datastore::BackupDir), +} + /// Backup Reader pub struct BackupReader { h2: H2Client, diff --git a/src/api2/admin/datastore.rs b/src/api2/admin/datastore.rs index 8d3a6146..8ad78f29 100644 --- a/src/api2/admin/datastore.rs +++ b/src/api2/admin/datastore.rs @@ -445,6 +445,16 @@ pub async fn list_snapshots( _param: Value, _info: &ApiMethod, rpcenv: &mut dyn RpcEnvironment, +) -> Result, Error> { + do_list_snapshots(store, ns, backup_type, backup_id, rpcenv).await +} + +pub async fn do_list_snapshots( + store: String, + ns: Option, + backup_type: Option, + backup_id: Option, + rpcenv: &mut dyn RpcEnvironment, ) -> Result, Error> { let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; diff --git a/src/server/pull.rs b/src/server/pull.rs index 65eedf2c..81df00c3 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,7 +1,7 @@ //! Sync datastore from remote server use std::collections::{HashMap, HashSet}; -use std::io::{Seek, SeekFrom}; +use std::io::{Seek, SeekFrom, Write}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -9,6 +9,7 @@ use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use http::StatusCode; use pbs_config::CachedUserInfo; +use pbs_datastore::read_chunk::AsyncReadChunk; use serde_json::json; use proxmox_router::HttpError; @@ -21,7 +22,7 @@ use pbs_api_types::{ }; use pbs_client::{ - BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader, + BackupReader, BackupRepository, BackupSource, HttpClient, HttpClientOptions, RemoteChunkReader, }; use pbs_datastore::data_blob::DataBlob; use pbs_datastore::dynamic_index::DynamicIndexReader; @@ -30,7 +31,7 @@ use pbs_datastore::index::IndexFile; use pbs_datastore::manifest::{ archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME, }; -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress}; +use pbs_datastore::{check_backup_owner, DataStore, LocalChunkReader, StoreProgress}; use pbs_tools::sha::sha256; use proxmox_rest_server::WorkerTask; @@ -40,7 +41,7 @@ use crate::tools::parallel_handler::ParallelHandler; /// Parameters for a pull operation. pub(crate) struct PullParameters { /// Remote that is pulled from - remote: Remote, + remote: Option, /// Full specification of remote datastore source: BackupRepository, /// Local store that is pulled into @@ -70,7 +71,7 @@ impl PullParameters { pub(crate) fn new( store: &str, ns: BackupNamespace, - remote: &str, + remote: Option<&str>, remote_store: &str, remote_ns: BackupNamespace, owner: Authid, @@ -86,18 +87,24 @@ impl PullParameters { remote_ns.check_max_depth(max_depth)?; } - let (remote_config, _digest) = pbs_config::remote::config()?; - let remote: Remote = remote_config.lookup("remote", remote)?; + let (remote, source): (Option, BackupRepository) = if let Some(remote_str) = remote + { + let (remote_config, _digest) = pbs_config::remote::config()?; + let remote = remote_config.lookup::("remote", remote_str)?; + let source = BackupRepository::new( + Some(remote.config.auth_id.clone()), + Some(remote.config.host.clone()), + remote.config.port, + remote_store.to_string(), + ); + (Some(remote), source) + } else { + let source = BackupRepository::new(None, None, None, remote_store.to_string()); + (None, source) + }; let remove_vanished = remove_vanished.unwrap_or(false); - let source = BackupRepository::new( - Some(remote.config.auth_id.clone()), - Some(remote.config.host.clone()), - remote.config.port, - remote_store.to_string(), - ); - Ok(Self { remote, remote_ns, @@ -114,13 +121,17 @@ impl PullParameters { /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from. pub async fn client(&self) -> Result { - crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await + if let Some(remote) = &self.remote { + crate::api2::config::remote::remote_client(remote, Some(self.limit.clone())).await + } else { + bail!("No remote specified. Do not use a HttpClient for a local sync.") + } } } -async fn pull_index_chunks( +async fn pull_index_chunks( worker: &WorkerTask, - chunk_reader: RemoteChunkReader, + chunk_reader: C, target: Arc, index: I, downloaded_chunks: Arc>>, @@ -256,10 +267,10 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err /// - Verify tmp file checksum /// - if archive is an index, pull referenced chunks /// - Rename tmp file into real path -async fn pull_single_archive( +async fn pull_single_archive( worker: &WorkerTask, - reader: &BackupReader, - chunk_reader: &mut RemoteChunkReader, + source: &BackupSource, + chunk_reader: C, snapshot: &pbs_datastore::BackupDir, archive_info: &FileInfo, downloaded_chunks: Arc>>, @@ -279,7 +290,15 @@ async fn pull_single_archive( .read(true) .open(&tmp_path)?; - reader.download(archive_name, &mut tmpfile).await?; + match source { + BackupSource::Remote(reader) => reader.download(archive_name, &mut tmpfile).await?, + BackupSource::Local(dir) => { + let mut source_path = dir.full_path(); + source_path.push(archive_name); + let data = std::fs::read(source_path)?; + tmpfile.write_all(&data)?; + } + }; match archive_type(archive_name)? { ArchiveType::DynamicIndex => { @@ -289,14 +308,23 @@ async fn pull_single_archive( let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + match source { + BackupSource::Local(ref dir) + if dir.datastore().name() == snapshot.datastore().name() => + { + task_log!(worker, "skipping chunk sync for same datatsore"); + } + _ => { + pull_index_chunks( + worker, + chunk_reader, + snapshot.datastore().clone(), + index, + downloaded_chunks, + ) + .await?; + } + }; } ArchiveType::FixedIndex => { let index = FixedIndexReader::new(tmpfile).map_err(|err| { @@ -304,15 +332,23 @@ async fn pull_single_archive( })?; let (csum, size) = index.compute_csum(); verify_archive(archive_info, &csum, size)?; - - pull_index_chunks( - worker, - chunk_reader.clone(), - snapshot.datastore().clone(), - index, - downloaded_chunks, - ) - .await?; + match source { + BackupSource::Local(ref dir) + if dir.datastore().name() == snapshot.datastore().name() => + { + task_log!(worker, "skipping chunk sync for same datatsore"); + } + _ => { + pull_index_chunks( + worker, + chunk_reader, + snapshot.datastore().clone(), + index, + downloaded_chunks, + ) + .await?; + } + }; } ArchiveType::Blob => { tmpfile.seek(SeekFrom::Start(0))?; @@ -321,6 +357,9 @@ async fn pull_single_archive( } } if let Err(err) = std::fs::rename(&tmp_path, &path) { + task_log!(worker, "sync archive {}", archive_name); + task_log!(worker, "tmpfile path {:?}", tmp_path.as_os_str()); + task_log!(worker, "path path {:?}", path.as_os_str()); bail!("Atomic rename file {:?} failed - {}", path, err); } Ok(()) @@ -364,7 +403,7 @@ async fn try_client_log_download( /// - Download log if not already existing async fn pull_snapshot( worker: &WorkerTask, - reader: Arc, + source: BackupSource, snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { @@ -377,31 +416,45 @@ async fn pull_snapshot( let mut tmp_manifest_name = manifest_name.clone(); tmp_manifest_name.set_extension("tmp"); - let download_res = download_manifest(&reader, &tmp_manifest_name).await; - let mut tmp_manifest_file = match download_res { - Ok(manifest_file) => manifest_file, - Err(err) => { - match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - task_log!( - worker, - "skipping snapshot {} - vanished since start of sync", - snapshot.dir(), - ); - return Ok(()); - } - _ => { - bail!("HTTP error {code} - {message}"); - } - }, - None => { - return Err(err); + let tmp_manifest_blob = match source { + BackupSource::Remote(ref reader) => { + let mut tmp_manifest_file = match download_manifest(reader, &tmp_manifest_name).await { + Ok(manifest_file) => manifest_file, + Err(err) => { + match err.downcast_ref::() { + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + task_log!( + worker, + "skipping snapshot {} - vanished since start of sync", + snapshot.dir(), + ); + return Ok(()); + } + _ => { + bail!("HTTP error {code} - {message}"); + } + }, + None => { + return Err(err); + } + }; } }; + DataBlob::load_from_reader(&mut tmp_manifest_file)? + } + BackupSource::Local(ref dir) => { + let data = dir.load_blob(MANIFEST_BLOB_NAME)?; + let mut tmp_manifest_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .read(true) + .open(&tmp_manifest_name)?; + tmp_manifest_file.write_all(data.raw_data())?; + data } }; - let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?; if manifest_name.exists() { let manifest_blob = proxmox_lang::try_block!({ @@ -417,13 +470,17 @@ async fn pull_snapshot( })?; if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() { - if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; + if let BackupSource::Remote(ref reader) = source { + if !client_log_name.exists() { + try_client_log_download(worker, reader.clone(), &client_log_name).await?; + }; + } + if tmp_manifest_name.exists() { + let _ = std::fs::remove_file(&tmp_manifest_name); } task_log!(worker, "no data changes"); - let _ = std::fs::remove_file(&tmp_manifest_name); return Ok(()); // nothing changed - } + }; } let manifest = BackupManifest::try_from(tmp_manifest_blob)?; @@ -467,32 +524,49 @@ async fn pull_snapshot( } } - let mut chunk_reader = RemoteChunkReader::new( - reader.clone(), - None, - item.chunk_crypt_mode(), - HashMap::new(), - ); - - pull_single_archive( - worker, - &reader, - &mut chunk_reader, - snapshot, - item, - downloaded_chunks.clone(), - ) - .await?; + match source { + BackupSource::Remote(ref reader) => { + let chunk_reader = RemoteChunkReader::new( + reader.clone(), + None, + item.chunk_crypt_mode(), + HashMap::new(), + ); + pull_single_archive( + worker, + &source, + chunk_reader, + snapshot, + item, + downloaded_chunks.clone(), + ) + .await? + } + BackupSource::Local(ref dir) => { + let chunk_reader = + LocalChunkReader::new(dir.datastore().clone(), None, item.chunk_crypt_mode()); + pull_single_archive( + worker, + &source, + chunk_reader, + snapshot, + item, + downloaded_chunks.clone(), + ) + .await? + } + } } if let Err(err) = std::fs::rename(&tmp_manifest_name, &manifest_name) { bail!("Atomic rename file {:?} failed - {}", manifest_name, err); } - if !client_log_name.exists() { - try_client_log_download(worker, reader, &client_log_name).await?; + if let BackupSource::Remote(reader) = source { + if !client_log_name.exists() { + try_client_log_download(worker, reader, &client_log_name).await?; + }; } - snapshot .cleanup_unreferenced_files(&manifest) .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?; @@ -506,7 +580,7 @@ async fn pull_snapshot( /// pointing to the local datastore and target namespace. async fn pull_snapshot_from( worker: &WorkerTask, - reader: Arc, + source: BackupSource, snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { @@ -517,7 +591,7 @@ async fn pull_snapshot_from( if is_new { task_log!(worker, "sync snapshot {}", snapshot.dir()); - if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { + if let Err(err) = pull_snapshot(worker, source, snapshot, downloaded_chunks).await { if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( snapshot.backup_ns(), snapshot.as_ref(), @@ -530,7 +604,7 @@ async fn pull_snapshot_from( task_log!(worker, "sync snapshot {} done", snapshot.dir()); } else { task_log!(worker, "re-sync snapshot {}", snapshot.dir()); - pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; + pull_snapshot(worker, source, snapshot, downloaded_chunks).await?; task_log!(worker, "re-sync snapshot {} done", snapshot.dir()); } @@ -600,36 +674,52 @@ impl std::fmt::Display for SkipInfo { /// - local group owner is already checked by pull_store async fn pull_group( worker: &WorkerTask, - client: &HttpClient, + client: Option<&HttpClient>, params: &PullParameters, group: &pbs_api_types::BackupGroup, remote_ns: BackupNamespace, progress: &mut StoreProgress, ) -> Result<(), Error> { - let path = format!( - "api2/json/admin/datastore/{}/snapshots", - params.source.store() - ); - - let mut args = json!({ - "backup-type": group.ty, - "backup-id": group.id, - }); - - if !remote_ns.is_root() { - args["ns"] = serde_json::to_value(&remote_ns)?; - } - let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; + let mut list: Vec = if let Some(client) = client { + let path = format!( + "api2/json/admin/datastore/{}/snapshots", + params.source.store() + ); - let mut result = client.get(&path, Some(args)).await?; - let mut list: Vec = serde_json::from_value(result["data"].take())?; + let mut args = json!({ + "backup-type": group.ty, + "backup-id": group.id, + }); - list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); + if !remote_ns.is_root() { + args["ns"] = serde_json::to_value(&remote_ns)?; + } - client.login().await?; // make sure auth is complete + let mut result = client.get(&path, Some(args)).await?; + serde_json::from_value(result["data"].take())? + } else { + let mut rpcenv = proxmox_router::cli::CliEnvironment::new(); + proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam"))); + let source_ns = if remote_ns.is_root() { + None + } else { + Some(remote_ns.clone()) + }; + crate::api2::admin::datastore::do_list_snapshots( + params.source.store().to_string(), + source_ns, + Some(group.ty), + Some(group.id.clone()), + &mut rpcenv, + ) + .await? + }; + list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time)); - let fingerprint = client.fingerprint(); + if let Some(client) = client { + client.login().await?; // make sure auth is complete + } let last_sync = params.store.last_successful_backup(&target_ns, group)?; @@ -646,6 +736,13 @@ async fn pull_group( count: 0, }; + let datastore: Option> = match client { + None => Some(DataStore::lookup_datastore( + params.source.store(), + Some(Operation::Read), + )?), + _ => None, + }; for (pos, item) in list.into_iter().enumerate() { let snapshot = item.backup; @@ -668,33 +765,47 @@ async fn pull_group( } } - // get updated auth_info (new tickets) - let auth_info = client.login().await?; - - let options = - HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone()) - .rate_limit(params.limit.clone()); + let backup_source = if let Some(client) = client { + // get updated auth_info (new tickets) + let auth_info = client.login().await?; + let fingerprint = client.fingerprint(); - let new_client = HttpClient::new( - params.source.host(), - params.source.port(), - params.source.auth_id(), - options, - )?; - - let reader = BackupReader::start( - new_client, - None, - params.source.store(), - &remote_ns, - &snapshot, - true, - ) - .await?; + let options = HttpClientOptions::new_non_interactive( + auth_info.ticket.clone(), + fingerprint.clone(), + ) + .rate_limit(params.limit.clone()); + + let new_client = HttpClient::new( + params.source.host(), + params.source.port(), + params.source.auth_id(), + options, + )?; + + BackupSource::Remote( + BackupReader::start( + new_client, + None, + params.source.store(), + &remote_ns, + &snapshot, + true, + ) + .await?, + ) + } else { + if let Some(datastore) = datastore.clone() { + BackupSource::Local(datastore.backup_dir(remote_ns.clone(), snapshot.clone())?) + } else { + unreachable!("if there is no client and no datastore, then the ds lookup would have failed earlier") + } + }; let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; - let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; + let result = + pull_snapshot_from(worker, backup_source, &snapshot, downloaded_chunks.clone()).await; progress.done_snapshots = pos as u64 + 1; task_log!(worker, "percentage done: {}", progress); @@ -735,49 +846,64 @@ async fn pull_group( // will modify params if switching to backwards mode for lack of NS support on remote end async fn query_namespaces( worker: &WorkerTask, - client: &HttpClient, + client: Option<&HttpClient>, params: &mut PullParameters, ) -> Result, Error> { - let path = format!( - "api2/json/admin/datastore/{}/namespace", - params.source.store() - ); - let mut data = json!({}); - if let Some(max_depth) = params.max_depth { - data["max-depth"] = json!(max_depth); - } + let mut list: Vec = if let Some(client) = client { + let path = format!( + "api2/json/admin/datastore/{}/namespace", + params.source.store() + ); + let mut data = json!({}); + if let Some(max_depth) = params.max_depth { + data["max-depth"] = json!(max_depth); + } - if !params.remote_ns.is_root() { - data["parent"] = json!(params.remote_ns); - } + if !params.remote_ns.is_root() { + data["parent"] = json!(params.remote_ns); + } - let mut result = match client.get(&path, Some(data)).await { - Ok(res) => res, - Err(err) => match err.downcast_ref::() { - Some(HttpError { code, message }) => match *code { - StatusCode::NOT_FOUND => { - if params.remote_ns.is_root() && params.max_depth.is_none() { - task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); - task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); - params.max_depth = Some(0); - } else { - bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") - } + let mut result = match client.get(&path, Some(data)).await { + Ok(res) => res, + Err(err) => match err.downcast_ref::() { + Some(HttpError { code, message }) => match *code { + StatusCode::NOT_FOUND => { + if params.remote_ns.is_root() && params.max_depth.is_none() { + task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode"); + task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system."); + params.max_depth = Some(0); + } else { + bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.") + } - return Ok(vec![params.remote_ns.clone()]); - } - _ => { - bail!("Querying namespaces failed - HTTP error {code} - {message}"); + return Ok(vec![params.remote_ns.clone()]); + } + _ => { + bail!("Querying namespaces failed - HTTP error {code} - {message}"); + } + }, + None => { + bail!("Querying namespaces failed - {err}"); } }, - None => { - bail!("Querying namespaces failed - {err}"); - } - }, - }; - - let mut list: Vec = serde_json::from_value(result["data"].take())?; + }; + serde_json::from_value(result["data"].take())? + } else { + let mut rpcenv = proxmox_router::cli::CliEnvironment::new(); + proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam"))); + let parent_ns = if params.remote_ns.is_root() { + None + } else { + Some(params.remote_ns.clone()) + }; + crate::api2::admin::namespace::list_namespaces( + params.source.store().to_string(), + parent_ns, + params.max_depth, + &mut rpcenv, + )? + }; // parents first list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); @@ -897,7 +1023,7 @@ fn check_and_remove_vanished_ns( /// - access to sub-NS checked here pub(crate) async fn pull_store( worker: &WorkerTask, - client: &HttpClient, + client: Option<&HttpClient>, mut params: PullParameters, ) -> Result<(), Error> { // explicit create shared lock to prevent GC on newly created chunks @@ -1000,27 +1126,42 @@ pub(crate) async fn pull_store( /// - owner check for vanished groups done here pub(crate) async fn pull_ns( worker: &WorkerTask, - client: &HttpClient, + client: Option<&HttpClient>, params: &PullParameters, source_ns: BackupNamespace, target_ns: BackupNamespace, ) -> Result<(StoreProgress, bool), Error> { - let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); + let mut list: Vec = if let Some(client) = client { + let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); - let args = if !source_ns.is_root() { - Some(json!({ - "ns": source_ns, - })) - } else { - None - }; + let args = if !source_ns.is_root() { + Some(json!({ + "ns": source_ns, + })) + } else { + None + }; - let mut result = client - .get(&path, args) - .await - .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; + let mut result = client + .get(&path, args) + .await + .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; - let mut list: Vec = serde_json::from_value(result["data"].take())?; + serde_json::from_value(result["data"].take())? + } else { + let mut rpcenv = proxmox_router::cli::CliEnvironment::new(); + proxmox_router::RpcEnvironment::set_auth_id(&mut rpcenv, Some(String::from("root@pam"))); + let source_ns = if source_ns.is_root() { + None + } else { + Some(source_ns.clone()) + }; + crate::api2::admin::datastore::list_groups( + params.source.store().to_string(), + source_ns, + &mut rpcenv, + )? + }; let total_count = list.len(); list.sort_unstable_by(|a, b| { -- 2.30.2