public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs
@ 2023-11-21 14:31 Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 1/6] accept a ref to a HttpClient Hannes Laimer
                   ` (8 more replies)
  0 siblings, 9 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

Add support for local sync. SyncJobs without a remote are considered local, and
use a different logic for pulling. In the course of adding the extra pull logic,
the pull code was rewritten to basically be source independent. Also cli
completion and the UI was updated to allow Remotes in SyncJobs to be optional.

v6: thanks @Gabriel and @Thomas for the feedback
 * disallow sync to same datastore
 * ui: fix not checked checkbox
 * ui: don't allow same datastore as source as the target
 * ui: render empty remote-id's the same way namespaces are
 * reorder patches, do config/api changes last
    (so instead of an Option pretending to be a String, we have a String
    pretending to be an Option, avoiding problems in intermediate patches)

v5: thanks @Thomas for the feedback 
 * ui: add radio buttons to make local vs remote more clear
 * move ui patch to the end + api2 -> api

v4: thanks @Wolfgang and @Lukas for the feedback
 * ui: disable rate limit for local sync jobs
 * ui: rename `Source Remote` -> `Source PBS`
 * update SYNC_JOB_WORKER_ID_REGEX, use '-' as remote for local
 * fix problme with groups not being synced to the correct ns
 * add check for source == target
 * moved two changes from patch 3 to patch 1

v3: thanks @Fabian and @Wolfang for the feedback
 * remove enums for Local/Remote
 * use traits, pull logic now expects a `dyn PullSource`(or `dyn PullReader`)
 * add lock to dir for local sync
 * split refactoring of pull logic and implementation of
    local pulling into two commits

v2: thanks @Fabian for the feedback
 * make pull logic more source independent

Hannes Laimer (6):
  accept a ref to a HttpClient
  pull: refactor pulling from a datastore
  pull: add support for pulling from local datastore
  manager: add completion for opt. Remote in SyncJob
  api: make Remote for SyncJob optional
  ui: add support for optional Remote in SyncJob

 Cargo.toml                           |    2 +
 examples/download-speed.rs           |    2 +-
 pbs-api-types/src/jobs.rs            |    9 +-
 pbs-client/src/backup_reader.rs      |    2 +-
 pbs-datastore/src/read_chunk.rs      |    2 +-
 proxmox-backup-client/src/catalog.rs |    4 +-
 proxmox-backup-client/src/main.rs    |    2 +-
 proxmox-backup-client/src/mount.rs   |    2 +-
 proxmox-file-restore/src/main.rs     |    4 +-
 src/api2/config/remote.rs            |   16 +-
 src/api2/config/sync.rs              |   41 +-
 src/api2/node/tasks.rs               |    3 +-
 src/api2/pull.rs                     |   60 +-
 src/bin/proxmox-backup-manager.rs    |   67 +-
 src/bin/proxmox_backup_debug/diff.rs |    2 +-
 src/server/email_notifications.rs    |   18 +-
 src/server/pull.rs                   | 1073 ++++++++++++++++----------
 www/Utils.js                         |    5 +
 www/config/SyncView.js               |    1 +
 www/form/RemoteTargetSelector.js     |   36 +-
 www/window/SyncJobEdit.js            |   73 +-
 21 files changed, 934 insertions(+), 490 deletions(-)

-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 1/6] accept a ref to a HttpClient
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 2/6] pull: refactor pulling from a datastore Hannes Laimer
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

... since the functions don't actually need to own the value.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 examples/download-speed.rs           | 2 +-
 pbs-client/src/backup_reader.rs      | 2 +-
 proxmox-backup-client/src/catalog.rs | 4 ++--
 proxmox-backup-client/src/main.rs    | 2 +-
 proxmox-backup-client/src/mount.rs   | 2 +-
 proxmox-file-restore/src/main.rs     | 4 ++--
 src/bin/proxmox_backup_debug/diff.rs | 2 +-
 src/server/pull.rs                   | 2 +-
 8 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/examples/download-speed.rs b/examples/download-speed.rs
index c487d704..fe700982 100644
--- a/examples/download-speed.rs
+++ b/examples/download-speed.rs
@@ -34,7 +34,7 @@ async fn run() -> Result<(), Error> {
     let backup_time = proxmox_time::parse_rfc3339("2019-06-28T10:49:48Z")?;
 
     let client = BackupReader::start(
-        client,
+        &client,
         None,
         "store2",
         &BackupNamespace::root(),
diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..36d8ebcf 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -44,7 +44,7 @@ impl BackupReader {
 
     /// Create a new instance by upgrading the connection at '/api2/json/reader'
     pub async fn start(
-        client: HttpClient,
+        client: &HttpClient,
         crypt_config: Option<Arc<CryptConfig>>,
         datastore: &str,
         ns: &BackupNamespace,
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index 8c8c1458..72b22e67 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -75,7 +75,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
     let client = connect(&repo)?;
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
@@ -187,7 +187,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 1a13291a..b66e3fdb 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1313,7 +1313,7 @@ async fn restore(
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &ns,
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 242556d0..4a2f8335 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -234,7 +234,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/proxmox-file-restore/src/main.rs b/proxmox-file-restore/src/main.rs
index 9c74a476..50875a63 100644
--- a/proxmox-file-restore/src/main.rs
+++ b/proxmox-file-restore/src/main.rs
@@ -107,7 +107,7 @@ async fn list_files(
 ) -> Result<Vec<ArchiveEntry>, Error> {
     let client = connect(&repo)?;
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &namespace,
@@ -430,7 +430,7 @@ async fn extract(
 
     let client = connect(&repo)?;
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &namespace,
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index 9924fb7b..1c64b27a 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -294,7 +294,7 @@ async fn create_backup_reader(
     };
     let client = connect(&params.repo)?;
     let backup_reader = BackupReader::start(
-        client,
+        &client,
         params.crypt_config.clone(),
         params.repo.store(),
         &params.namespace,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index a973a10e..e55452d1 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -738,7 +738,7 @@ async fn pull_group(
         )?;
 
         let reader = BackupReader::start(
-            new_client,
+            &new_client,
             None,
             params.source.store(),
             &remote_ns,
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 2/6] pull: refactor pulling from a datastore
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 1/6] accept a ref to a HttpClient Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 3/6] pull: add support for pulling from local datastore Hannes Laimer
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

... making the pull logic independent from the actual source
using two traits.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 Cargo.toml                      |   2 +
 pbs-datastore/src/read_chunk.rs |   2 +-
 src/api2/config/remote.rs       |  14 +-
 src/api2/pull.rs                |  10 +-
 src/server/pull.rs              | 936 ++++++++++++++++++--------------
 5 files changed, 562 insertions(+), 402 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index db18cd45..07283bb5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -102,6 +102,7 @@ proxmox-rrd = { path = "proxmox-rrd" }
 
 # regular crates
 anyhow = "1.0"
+async-trait = "0.1.56"
 apt-pkg-native = "0.3.2"
 base64 = "0.13"
 bitflags = "1.2.1"
@@ -153,6 +154,7 @@ zstd = { version = "0.12", features = [ "bindgen" ] }
 
 [dependencies]
 anyhow.workspace = true
+async-trait.workspace = true
 apt-pkg-native.workspace = true
 base64.workspace = true
 bitflags.workspace = true
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
 }
 
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
     /// Returns the encoded chunk data
     fn read_raw_chunk<'a>(
         &'a self,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 76dd3b89..31922b94 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -300,8 +300,8 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
     Ok(())
 }
 
-/// Helper to get client for remote.cfg entry
-pub async fn remote_client(
+/// Helper to get client for remote.cfg entry without login, just config
+pub fn remote_client_config(
     remote: &Remote,
     limit: Option<RateLimitConfig>,
 ) -> Result<HttpClient, Error> {
@@ -320,6 +320,16 @@ pub async fn remote_client(
         &remote.config.auth_id,
         options,
     )?;
+
+    Ok(client)
+}
+
+/// Helper to get client for remote.cfg entry
+pub async fn remote_client(
+    remote: &Remote,
+    limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+    let client = remote_client_config(remote, limit)?;
     let _auth_info = client
         .login() // make sure we can auth
         .await
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index daeba7cf..8cd28fa0 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -65,7 +65,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            &sync_job.remote,
+            Some(&sync_job.remote),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -114,7 +114,6 @@ pub fn do_sync_job(
 
             let worker_future = async move {
                 let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -128,7 +127,7 @@ pub fn do_sync_job(
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, pull_params).await?;
+                pull_store(&worker, pull_params).await?;
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -256,7 +255,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        &remote,
+        Some(&remote),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -266,7 +265,6 @@ async fn pull(
         limit,
         transfer_last,
     )?;
-    let client = pull_params.client().await?;
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -284,7 +282,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index e55452d1..ff3e6d0a 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::Seek;
+use std::path::Path;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
-use proxmox_sys::task_log;
+use proxmox_sys::{task_log, task_warn};
+use serde_json::json;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
-    Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
+    print_store_and_ns, Authid, BackupDir, BackupGroup, BackupNamespace, CryptMode, GroupFilter,
+    GroupListItem, Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-
-use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,327 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
+use pbs_datastore::read_chunk::AsyncReadChunk;
 use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
 use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
 
 use crate::backup::{check_ns_modification_privs, check_ns_privs};
 use crate::tools::parallel_handler::ParallelHandler;
 
-/// Parameters for a pull operation.
-pub(crate) struct PullParameters {
-    /// Remote that is pulled from
-    remote: Remote,
-    /// Full specification of remote datastore
-    source: BackupRepository,
-    /// Local store that is pulled into
+struct RemoteReader {
+    backup_reader: Arc<BackupReader>,
+    dir: BackupDir,
+}
+
+pub(crate) struct PullTarget {
     store: Arc<DataStore>,
-    /// Remote namespace
-    remote_ns: BackupNamespace,
-    /// Local namespace (anchor)
     ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+    repo: BackupRepository,
+    ns: BackupNamespace,
+    client: HttpClient,
+}
+
+#[async_trait::async_trait]
+/// `PullSource` is a trait that provides an interface for pulling data/information from a source.
+/// The trait includes methods for listing namespaces, groups, and backup directories,
+/// as well as retrieving a reader for reading data from the source
+trait PullSource: Send + Sync {
+    /// Lists namespaces from the source.
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error>;
+
+    /// Lists groups within a specific namespace from the source.
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error>;
+
+    /// Lists backup directories for a specific group within a specific namespace from the source.
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error>;
+    fn get_ns(&self) -> BackupNamespace;
+    fn print_store_and_ns(&self) -> String;
+
+    /// Returns a reader for reading data from a specific backup directory.
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error>;
+}
+
+#[async_trait::async_trait]
+impl PullSource for RemoteSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        if self.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+            return Ok(vec![self.ns.clone()]);
+        }
+
+        let path = format!("api2/json/admin/datastore/{}/namespace", self.repo.store());
+        let mut data = json!({});
+        if let Some(max_depth) = max_depth {
+            data["max-depth"] = json!(max_depth);
+        }
+
+        if !self.ns.is_root() {
+            data["parent"] = json!(self.ns);
+        }
+        self.client.login().await?;
+
+        let mut result = match self.client.get(&path, Some(data)).await {
+            Ok(res) => res,
+            Err(err) => match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match code {
+                    &StatusCode::NOT_FOUND => {
+                        if self.ns.is_root() && max_depth.is_none() {
+                            task_warn!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                            task_warn!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                            max_depth.replace(0);
+                        } else {
+                            bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                        }
+
+                        return Ok(vec![self.ns.clone()]);
+                    }
+                    _ => {
+                        bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    bail!("Querying namespaces failed - {err}");
+                }
+            },
+        };
+
+        let list: Vec<BackupNamespace> =
+            serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+                .into_iter()
+                .map(|list_item| list_item.ns)
+                .collect();
+
+        Ok(list)
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        _owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/groups", self.repo.store());
+
+        let args = if !namespace.is_root() {
+            Some(json!({ "ns": namespace.clone() }))
+        } else {
+            None
+        };
+
+        self.client.login().await?;
+        let mut result =
+            self.client.get(&path, args).await.map_err(|err| {
+                format_err!("Failed to retrieve backup groups from remote - {}", err)
+            })?;
+
+        Ok(
+            serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                .map_err(Error::from)?
+                .into_iter()
+                .map(|item| item.backup)
+                .collect::<Vec<BackupGroup>>(),
+        )
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        _namespace: &BackupNamespace,
+        group: &BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        let path = format!("api2/json/admin/datastore/{}/snapshots", self.repo.store());
+
+        let mut args = json!({
+            "backup-type": group.ty,
+            "backup-id": group.id,
+        });
+
+        if !self.ns.is_root() {
+            args["ns"] = serde_json::to_value(&self.ns)?;
+        }
+
+        self.client.login().await?;
+
+        let mut result = self.client.get(&path, Some(args)).await?;
+        let snapshot_list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
+        Ok(snapshot_list
+            .into_iter()
+            .filter_map(|item: SnapshotListItem| {
+                let snapshot = item.backup;
+                // in-progress backups can't be synced
+                if item.size.is_none() {
+                    task_log!(
+                        worker,
+                        "skipping snapshot {} - in-progress backup",
+                        snapshot
+                    );
+                    return None;
+                }
+
+                Some(snapshot)
+            })
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.repo.store(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let backup_reader =
+            BackupReader::start(&self.client, None, self.repo.store(), ns, dir, true).await?;
+        Ok(Arc::new(RemoteReader {
+            backup_reader,
+            dir: dir.clone(),
+        }))
+    }
+}
+
+#[async_trait::async_trait]
+/// `PullReader` is a trait that provides an interface for reading data from a source.
+/// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
+trait PullReader: Send + Sync {
+    /// Returns a chunk reader with the specified encryption mode.
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk>;
+
+    /// Asynchronously loads a file from the source into a local file.
+    /// `filename` is the name of the file to load from the source.
+    /// `into` is the path of the local file to load the source file into.
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error>;
+
+    /// Tries to download the client log from the source and save it into a local file.
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error>;
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool;
+}
+
+#[async_trait::async_trait]
+impl PullReader for RemoteReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(RemoteChunkReader::new(
+            self.backup_reader.clone(),
+            None,
+            crypt_mode,
+            HashMap::new(),
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let download_result = self.backup_reader.download(filename, &mut tmp_file).await;
+        if let Err(err) = download_result {
+            match err.downcast_ref::<HttpError>() {
+                Some(HttpError { code, message }) => match *code {
+                    StatusCode::NOT_FOUND => {
+                        task_log!(
+                            worker,
+                            "skipping snapshot {} - vanished since start of sync",
+                            &self.dir,
+                        );
+                        return Ok(None);
+                    }
+                    _ => {
+                        bail!("HTTP error {code} - {message}");
+                    }
+                },
+                None => {
+                    return Err(err);
+                }
+            };
+        };
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        to_path: &Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        let mut tmp_path = to_path.to_owned();
+        tmp_path.set_extension("tmp");
+
+        let tmpfile = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .read(true)
+            .open(&tmp_path)?;
+
+        // Note: be silent if there is no log - only log successful download
+        if let Ok(()) = self
+            .backup_reader
+            .download(CLIENT_LOG_BLOB_NAME, tmpfile)
+            .await
+        {
+            if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                bail!("Atomic rename file {:?} failed - {}", to_path, err);
+            }
+            task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+        }
+
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, _target_store_name: &str) -> bool {
+        false
+    }
+}
+
+/// Parameters for a pull operation.
+pub(crate) struct PullParameters {
+    /// Where data is pulled from
+    source: Arc<dyn PullSource>,
+    /// Where data should be pulled into
+    target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -57,22 +357,16 @@ pub(crate) struct PullParameters {
     max_depth: Option<usize>,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
-    /// Rate limits for all transfers from `remote`
-    limit: RateLimitConfig,
     /// How many snapshots should be transferred at most (taking the newest N snapshots)
     transfer_last: Option<usize>,
 }
 
 impl PullParameters {
     /// Creates a new instance of `PullParameters`.
-    ///
-    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
-    /// [BackupRepository] with `remote_store`.
-    #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
@@ -82,49 +376,51 @@ impl PullParameters {
         limit: RateLimitConfig,
         transfer_last: Option<usize>,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
             remote_ns.check_max_depth(max_depth)?;
-        }
-
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
-
+        };
         let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
+        let source: Arc<dyn PullSource> = if let Some(remote) = remote {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        Ok(Self {
-            remote,
-            remote_ns,
+            let repo = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
+            let client = crate::api2::config::remote::remote_client_config(&remote, Some(limit))?;
+            Arc::new(RemoteSource {
+                repo,
+                ns: remote_ns,
+                client,
+            })
+        } else {
+            bail!("local sync not implemented yet")
+        };
+        let target = PullTarget {
+            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
             ns,
+        };
+
+        Ok(Self {
             source,
-            store,
+            target,
             owner,
             remove_vanished,
             max_depth,
             group_filter,
-            limit,
             transfer_last,
         })
     }
-
-    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
-    pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
-    }
 }
 
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: Arc<dyn AsyncReadChunk>,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -215,26 +511,6 @@ async fn pull_index_chunks<I: IndexFile>(
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-    let mut tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .read(true)
-        .open(filename)?;
-
-    reader
-        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
-        .await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
         bail!(
@@ -255,17 +531,16 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// Pulls a single file referenced by a manifest.
 ///
 /// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
-/// - Verify tmp file checksum
+/// - Load archive file into tmp file
+/// -- Load file into tmp file
+/// -- Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
-async fn pull_single_archive(
-    worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    snapshot: &pbs_datastore::BackupDir,
-    archive_info: &FileInfo,
+async fn pull_single_archive<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
+    archive_info: &'a FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
@@ -277,13 +552,11 @@ async fn pull_single_archive(
 
     task_log!(worker, "sync archive {}", archive_name);
 
-    let mut tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
+    reader
+        .load_file_into(archive_name, &tmp_path, worker)
+        .await?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -293,14 +566,18 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datastore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -309,17 +586,21 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if reader.skip_chunk_sync(snapshot.datastore().name()) {
+                task_log!(worker, "skipping chunk sync for same datastore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    reader.chunk_reader(archive_info.crypt_mode),
+                    snapshot.datastore().clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::Blob => {
-            tmpfile.seek(SeekFrom::Start(0))?;
+            tmpfile.rewind()?;
             let (csum, size) = sha256(&mut tmpfile)?;
             verify_archive(archive_info, &csum, size)?;
         }
@@ -330,33 +611,6 @@ async fn pull_single_archive(
     Ok(())
 }
 
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    path: &std::path::Path,
-) -> Result<(), Error> {
-    let mut tmp_path = path.to_owned();
-    tmp_path.set_extension("tmp");
-
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    // Note: be silent if there is no log - only log successful download
-    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            bail!("Atomic rename file {:?} failed - {}", path, err);
-        }
-        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
-    }
-
-    Ok(())
-}
-
 /// Actual implementation of pulling a snapshot.
 ///
 /// Pulling a snapshot consists of the following steps:
@@ -366,10 +620,10 @@ async fn try_client_log_download(
 /// -- if file already exists, verify contents
 /// -- if not, pull it from the remote
 /// - Download log if not already existing
-async fn pull_snapshot(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+async fn pull_snapshot<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let mut manifest_name = snapshot.full_path();
@@ -380,32 +634,15 @@ async fn pull_snapshot(
 
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
-
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        }
-    };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+    let tmp_manifest_blob;
+    if let Some(data) = reader
+        .load_file_into(MANIFEST_BLOB_NAME, &tmp_manifest_name, worker)
+        .await?
+    {
+        tmp_manifest_blob = data;
+    } else {
+        return Ok(());
+    }
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -422,8 +659,10 @@ async fn pull_snapshot(
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
-            }
+                reader
+                    .try_download_client_log(&client_log_name, worker)
+                    .await?;
+            };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
@@ -471,17 +710,9 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
         pull_single_archive(
             worker,
-            &reader,
-            &mut chunk_reader,
+            reader.clone(),
             snapshot,
             item,
             downloaded_chunks.clone(),
@@ -494,9 +725,10 @@ async fn pull_snapshot(
     }
 
     if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
-    }
-
+        reader
+            .try_download_client_log(&client_log_name, worker)
+            .await?;
+    };
     snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
@@ -506,12 +738,12 @@ async fn pull_snapshot(
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
 ///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
-async fn pull_snapshot_from(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+/// The `reader` is configured to read from the source backup directory, while the
+/// `snapshot` is pointing to the local datastore and target namespace.
+async fn pull_snapshot_from<'a>(
+    worker: &'a WorkerTask,
+    reader: Arc<dyn PullReader + 'a>,
+    snapshot: &'a pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let (_path, is_new, _snap_lock) = snapshot
@@ -626,11 +858,10 @@ impl std::fmt::Display for SkipInfo {
 /// - Sort by snapshot time
 /// - Get last snapshot timestamp on local datastore
 /// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
 ///
-/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the
+/// Backwards-compat: if `source_namespace` is [None], only the group type and ID will be sent to the
 /// remote when querying snapshots. This allows us to interact with old remotes that don't have
 /// namespace support yet.
 ///
@@ -639,117 +870,79 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
     params: &PullParameters,
-    group: &pbs_api_types::BackupGroup,
-    remote_ns: BackupNamespace,
+    source_namespace: &BackupNamespace,
+    group: &BackupGroup,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    task_log!(worker, "sync group {}", group);
-
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
-    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
-    client.login().await?; // make sure auth is complete
-
-    let fingerprint = client.fingerprint();
-
-    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-    let last_sync_time = last_sync.unwrap_or(i64::MIN);
-
-    let mut remote_snapshots = std::collections::HashSet::new();
-
-    // start with 65536 chunks (up to 256 GiB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
-    progress.group_snapshots = list.len() as u64;
-
     let mut already_synced_skip_info = SkipInfo::new(SkipReason::AlreadySynced);
     let mut transfer_last_skip_info = SkipInfo::new(SkipReason::TransferLast);
 
-    let total_amount = list.len();
+    let mut raw_list: Vec<BackupDir> = params
+        .source
+        .list_backup_dirs(source_namespace, group, worker)
+        .await?;
+    raw_list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
+
+    let total_amount = raw_list.len();
 
     let cutoff = params
         .transfer_last
         .map(|count| total_amount.saturating_sub(count))
         .unwrap_or_default();
 
-    for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = item.backup;
-
-        // in-progress backups can't be synced
-        if item.size.is_none() {
-            task_log!(
-                worker,
-                "skipping snapshot {} - in-progress backup",
-                snapshot
-            );
-            continue;
-        }
+    let target_ns = source_namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
 
-        remote_snapshots.insert(snapshot.time);
-
-        if last_sync_time > snapshot.time {
-            already_synced_skip_info.update(snapshot.time);
-            continue;
-        } else if already_synced_skip_info.count > 0 {
-            task_log!(worker, "{}", already_synced_skip_info);
-            already_synced_skip_info.reset();
-        }
-
-        if pos < cutoff && last_sync_time != snapshot.time {
-            transfer_last_skip_info.update(snapshot.time);
-            continue;
-        } else if transfer_last_skip_info.count > 0 {
-            task_log!(worker, "{}", transfer_last_skip_info);
-            transfer_last_skip_info.reset();
-        }
-
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
+    let mut source_snapshots = HashSet::new();
+    let last_sync_time = params
+        .target
+        .store
+        .last_successful_backup(&target_ns, group)?
+        .unwrap_or(i64::MIN);
+
+    let list: Vec<BackupDir> = raw_list
+        .into_iter()
+        .enumerate()
+        .filter(|&(pos, ref dir)| {
+            source_snapshots.insert(dir.time);
+            if last_sync_time > dir.time {
+                already_synced_skip_info.update(dir.time);
+                return false;
+            } else if already_synced_skip_info.count > 0 {
+                task_log!(worker, "{}", already_synced_skip_info);
+                already_synced_skip_info.reset();
+                return true;
+            }
 
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
+            if pos < cutoff && last_sync_time != dir.time {
+                transfer_last_skip_info.update(dir.time);
+                return false;
+            } else if transfer_last_skip_info.count > 0 {
+                task_log!(worker, "{}", transfer_last_skip_info);
+                transfer_last_skip_info.reset();
+            }
+            true
+        })
+        .map(|(_, dir)| dir)
+        .collect();
 
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
+    // start with 65536 chunks (up to 256 GiB)
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-        let reader = BackupReader::start(
-            &new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+    progress.group_snapshots = list.len() as u64;
 
-        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+    for (pos, from_snapshot) in list.into_iter().enumerate() {
+        let to_snapshot = params
+            .target
+            .store
+            .backup_dir(target_ns.clone(), from_snapshot.clone())?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let reader = params
+            .source
+            .reader(source_namespace, &from_snapshot)
+            .await?;
+        let result =
+            pull_snapshot_from(worker, reader, &to_snapshot, downloaded_chunks.clone()).await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -758,11 +951,14 @@ async fn pull_group(
     }
 
     if params.remove_vanished {
-        let group = params.store.backup_group(target_ns.clone(), group.clone());
+        let group = params
+            .target
+            .store
+            .backup_group(target_ns.clone(), group.clone());
         let local_list = group.list_backups()?;
         for info in local_list {
             let snapshot = info.backup_dir;
-            if remote_snapshots.contains(&snapshot.backup_time()) {
+            if source_snapshots.contains(&snapshot.backup_time()) {
                 continue;
             }
             if snapshot.is_protected() {
@@ -775,6 +971,7 @@ async fn pull_group(
             }
             task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
             params
+                .target
                 .store
                 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
         }
@@ -783,64 +980,12 @@ async fn pull_group(
     Ok(())
 }
 
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
-
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
-
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
-
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                }
-            },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
-    // parents first
-    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
-    Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
     let mut created = false;
-    let store_ns_str = print_store_and_ns(params.store.name(), ns);
+    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
 
-    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
-        check_ns_modification_privs(params.store.name(), ns, &params.owner)
+    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
             .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
 
         let name = match ns.components().last() {
@@ -850,14 +995,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
             }
         };
 
-        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
             bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
         }
         created = true;
     }
 
     check_ns_privs(
-        params.store.name(),
+        params.target.store.name(),
         ns,
         &params.owner,
         PRIV_DATASTORE_BACKUP,
@@ -868,10 +1013,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
 }
 
 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
-    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
+    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
         .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
 
-    params.store.remove_namespace_recursive(local_ns, true)
+    params
+        .target
+        .store
+        .remove_namespace_recursive(local_ns, true)
 }
 
 fn check_and_remove_vanished_ns(
@@ -885,14 +1033,15 @@ fn check_and_remove_vanished_ns(
     // clamp like remote does so that we don't list more than we can ever have synced.
     let max_depth = params
         .max_depth
-        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
 
     let mut local_ns_list: Vec<BackupNamespace> = params
+        .target
         .store
-        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
         .filter(|ns| {
             let user_privs =
-                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
+                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
             user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
         })
         .collect();
@@ -901,7 +1050,7 @@ fn check_and_remove_vanished_ns(
     local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
 
     for local_ns in local_ns_list {
-        if local_ns == params.ns {
+        if local_ns == params.target.ns {
             continue;
         }
 
@@ -948,29 +1097,49 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
 
     let old_max_depth = params.max_depth;
-    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
-        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
+    let mut namespaces = if params.source.get_ns().is_root() && old_max_depth == Some(0) {
+        vec![params.source.get_ns()] // backwards compat - don't query remote namespaces!
     } else {
-        query_namespaces(worker, client, &mut params).await?
+        params
+            .source
+            .list_namespaces(&mut params.max_depth, worker)
+            .await?
     };
+
+    let ns_layers_to_be_pulled = namespaces
+        .iter()
+        .map(BackupNamespace::depth)
+        .max()
+        .map_or(0, |v| v - params.source.get_ns().depth());
+    let target_depth = params.target.ns.depth();
+
+    if ns_layers_to_be_pulled + target_depth > MAX_NAMESPACE_DEPTH {
+        bail!(
+            "Syncing would exceed max allowed namespace depth. ({}+{} > {})",
+            ns_layers_to_be_pulled,
+            target_depth,
+            MAX_NAMESPACE_DEPTH
+        );
+    }
+
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+    namespaces.sort_unstable_by_key(|a| a.name_len());
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
 
     for namespace in namespaces {
-        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+        let source_store_ns_str = params.source.print_store_and_ns();
 
-        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
-        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
 
         task_log!(worker, "----");
         task_log!(
@@ -998,7 +1167,7 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(worker, &namespace, &mut params).await {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -1019,7 +1188,7 @@ pub(crate) async fn pull_store(
                 task_log!(
                     worker,
                     "Encountered errors while syncing namespace {} - {}",
-                    namespace,
+                    &namespace,
                     err,
                 );
             }
@@ -1051,48 +1220,28 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
-    source_ns: BackupNamespace,
-    target_ns: BackupNamespace,
+    namespace: &BackupNamespace,
+    params: &mut PullParameters,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
-
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+    let mut list: Vec<BackupGroup> = params.source.list_groups(namespace, &params.owner).await?;
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup.ty.cmp(&b.backup.ty);
+        let type_order = a.ty.cmp(&b.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup.id.cmp(&b.backup.id)
+            a.id.cmp(&b.id)
         } else {
             type_order
         }
     });
 
-    let apply_filters = |group: &pbs_api_types::BackupGroup, filters: &[GroupFilter]| -> bool {
+    let apply_filters = |group: &BackupGroup, filters: &[GroupFilter]| -> bool {
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    // Get groups with target NS set
-    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
-        let list: Vec<pbs_api_types::BackupGroup> = list
+        let list: Vec<BackupGroup> = list
             .into_iter()
             .filter(|group| apply_filters(group, group_filter))
             .collect();
@@ -1110,13 +1259,15 @@ pub(crate) async fn pull_ns(
 
     let mut errors = false;
 
-    let mut new_groups = std::collections::HashSet::new();
+    let mut new_groups = HashSet::new();
     for group in list.iter() {
         new_groups.insert(group.clone());
     }
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
+    let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+
     for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
@@ -1124,6 +1275,7 @@ pub(crate) async fn pull_ns(
 
         let (owner, _lock_guard) =
             match params
+                .target
                 .store
                 .create_locked_backup_group(&target_ns, &group, &params.owner)
             {
@@ -1135,7 +1287,9 @@ pub(crate) async fn pull_ns(
                         &group,
                         err
                     );
-                    errors = true; // do not stop here, instead continue
+                    errors = true;
+                    // do not stop here, instead continue
+                    task_log!(worker, "create_locked_backup_group failed");
                     continue;
                 }
             };
@@ -1151,15 +1305,7 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            source_ns.clone(),
-            &mut progress,
-        )
-        .await
+        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
         {
             task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
@@ -1168,13 +1314,13 @@ pub(crate) async fn pull_ns(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
                 let local_group = local_group?;
                 let local_group = local_group.group();
                 if new_groups.contains(local_group) {
                     continue;
                 }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
+                let owner = params.target.store.get_owner(&target_ns, local_group)?;
                 if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
@@ -1184,7 +1330,11 @@ pub(crate) async fn pull_ns(
                     }
                 }
                 task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
+                match params
+                    .target
+                    .store
+                    .remove_backup_group(&target_ns, local_group)
+                {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 3/6] pull: add support for pulling from local datastore
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 1/6] accept a ref to a HttpClient Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 2/6] pull: refactor pulling from a datastore Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 4/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/server/pull.rs | 143 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 138 insertions(+), 5 deletions(-)

diff --git a/src/server/pull.rs b/src/server/pull.rs
index ff3e6d0a..1403c7a7 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,8 +1,8 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::Seek;
-use std::path::Path;
+use std::io::{Seek, Write};
+use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
@@ -29,10 +29,12 @@ use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
 use pbs_datastore::read_chunk::AsyncReadChunk;
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::{
+    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
 use pbs_tools::sha::sha256;
 
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
 struct RemoteReader {
@@ -40,6 +42,12 @@ struct RemoteReader {
     dir: BackupDir,
 }
 
+struct LocalReader {
+    _dir_lock: Arc<Mutex<proxmox_sys::fs::DirLockGuard>>,
+    path: PathBuf,
+    datastore: Arc<DataStore>,
+}
+
 pub(crate) struct PullTarget {
     store: Arc<DataStore>,
     ns: BackupNamespace,
@@ -51,6 +59,11 @@ pub(crate) struct RemoteSource {
     client: HttpClient,
 }
 
+pub(crate) struct LocalSource {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
 #[async_trait::async_trait]
 /// `PullSource` is a trait that provides an interface for pulling data/information from a source.
 /// The trait includes methods for listing namespaces, groups, and backup directories,
@@ -234,6 +247,81 @@ impl PullSource for RemoteSource {
     }
 }
 
+#[async_trait::async_trait]
+impl PullSource for LocalSource {
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        ListNamespacesRecursive::new_max_depth(
+            self.store.clone(),
+            self.ns.clone(),
+            max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+        )?
+        .collect()
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<BackupGroup>, Error> {
+        Ok(ListAccessibleBackupGroups::new_with_privs(
+            &self.store,
+            namespace.clone(),
+            0,
+            None,
+            None,
+            Some(owner),
+        )?
+        .filter_map(Result::ok)
+        .map(|backup_group| backup_group.group().clone())
+        .collect::<Vec<pbs_api_types::BackupGroup>>())
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &BackupGroup,
+        _worker: &WorkerTask,
+    ) -> Result<Vec<BackupDir>, Error> {
+        Ok(self
+            .store
+            .backup_group(namespace.clone(), group.clone())
+            .iter_snapshots()?
+            .filter_map(Result::ok)
+            .map(|snapshot| snapshot.dir().to_owned())
+            .collect::<Vec<BackupDir>>())
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        self.ns.clone()
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        print_store_and_ns(self.store.name(), &self.ns)
+    }
+
+    async fn reader(
+        &self,
+        ns: &BackupNamespace,
+        dir: &BackupDir,
+    ) -> Result<Arc<dyn PullReader>, Error> {
+        let dir = self.store.backup_dir(ns.clone(), dir.clone())?;
+        let dir_lock = proxmox_sys::fs::lock_dir_noblock_shared(
+            &dir.full_path(),
+            "snapshot",
+            "locked by another operation",
+        )?;
+        Ok(Arc::new(LocalReader {
+            _dir_lock: Arc::new(Mutex::new(dir_lock)),
+            path: dir.full_path(),
+            datastore: dir.datastore().clone(),
+        }))
+    }
+}
+
 #[async_trait::async_trait]
 /// `PullReader` is a trait that provides an interface for reading data from a source.
 /// The trait includes methods for getting a chunk reader, loading a file, downloading client log, and checking whether chunk sync should be skipped.
@@ -343,6 +431,48 @@ impl PullReader for RemoteReader {
     }
 }
 
+#[async_trait::async_trait]
+impl PullReader for LocalReader {
+    fn chunk_reader(&self, crypt_mode: CryptMode) -> Arc<dyn AsyncReadChunk> {
+        Arc::new(LocalChunkReader::new(
+            self.datastore.clone(),
+            None,
+            crypt_mode,
+        ))
+    }
+
+    async fn load_file_into(
+        &self,
+        filename: &str,
+        into: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        let mut from_path = self.path.clone();
+        from_path.push(filename);
+        tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+        tmp_file.rewind()?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    async fn try_download_client_log(
+        &self,
+        _to_path: &Path,
+        _worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        Ok(())
+    }
+
+    fn skip_chunk_sync(&self, target_store_name: &str) -> bool {
+        self.datastore.name() == target_store_name
+    }
+}
+
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
     /// Where data is pulled from
@@ -399,7 +529,10 @@ impl PullParameters {
                 client,
             })
         } else {
-            bail!("local sync not implemented yet")
+            Arc::new(LocalSource {
+                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+                ns: remote_ns,
+            })
         };
         let target = PullTarget {
             store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 4/6] manager: add completion for opt. Remote in SyncJob
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (2 preceding siblings ...)
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 3/6] pull: add support for pulling from local datastore Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 5/6] api: make Remote for SyncJob optional Hannes Laimer
                   ` (4 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/bin/proxmox-backup-manager.rs | 63 +++++++++++++++++++------------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b4cb6cb3..b4948e43 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -542,7 +542,7 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
     })
 }
 
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
     let mut job: Option<SyncJobConfig> = None;
 
     let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
@@ -555,15 +555,13 @@ fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)>
         None
     });
 
-    if let Some(remote) = remote {
-        let store = param
-            .get("remote-store")
-            .map(|r| r.to_owned())
-            .or_else(|| job.map(|job| job.remote_store));
+    let store = param
+        .get("remote-store")
+        .map(|r| r.to_owned())
+        .or_else(|| job.map(|job| job.remote_store));
 
-        if let Some(store) = store {
-            return Some((remote, store));
-        }
+    if let Some(store) = store {
+        return Some((remote, store));
     }
 
     None
@@ -584,7 +582,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
 }
 
 // shell completion helper
-pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
     if let Some(remote) = get_remote(param) {
@@ -595,7 +593,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
                 list.push(item.store);
             }
         }
-    }
+    } else {
+        list = pbs_config::datastore::complete_datastore_name(arg, param);
+    };
 
     list
 }
@@ -607,17 +607,25 @@ pub fn complete_remote_datastore_namespace(
 ) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_namespaces(
                 remote.clone(),
                 remote_store.clone(),
             )
             .await
-        }) {
-            for item in data {
-                list.push(item.ns.name());
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
+                .ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(item.ns.name());
         }
     }
 
@@ -662,19 +670,26 @@ pub fn complete_sync_local_datastore_namespace(
 pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        let ns = get_remote_ns(param);
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    let ns = get_remote_ns(param);
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_groups(
                 remote.clone(),
                 remote_store.clone(),
                 ns,
             )
             .await
-        }) {
-            for item in data {
-                list.push(format!("{}/{}", item.backup.ty, item.backup.id));
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(format!("{}/{}", item.backup.ty, item.backup.id));
         }
     }
 
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 5/6] api: make Remote for SyncJob optional
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (3 preceding siblings ...)
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 4/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
                   ` (3 subsequent siblings)
  8 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/jobs.rs         |  9 ++++--
 src/api2/config/remote.rs         |  2 +-
 src/api2/config/sync.rs           | 41 +++++++++++++++--------
 src/api2/node/tasks.rs            |  3 +-
 src/api2/pull.rs                  | 54 ++++++++++++++++++++++---------
 src/bin/proxmox-backup-manager.rs |  4 +--
 src/server/email_notifications.rs | 18 ++++++-----
 7 files changed, 87 insertions(+), 44 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index 7b5a1f3d..b8640216 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -17,8 +17,8 @@ const_regex! {
 
     /// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID'
     pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):");
-    /// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
-    pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
+    /// Regex for sync jobs '(REMOTE|\-):REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID'
+    pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"|\-):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r")(?::(", BACKUP_NS_RE!(), r"))?:");
 }
 
 pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.")
@@ -471,6 +471,7 @@ pub const TRANSFER_LAST_SCHEMA: Schema =
         },
         remote: {
             schema: REMOTE_ID_SCHEMA,
+            optional: true,
         },
         "remote-store": {
             schema: DATASTORE_SCHEMA,
@@ -519,7 +520,9 @@ pub struct SyncJobConfig {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub owner: Option<Authid>,
-    pub remote: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    /// None implies local sync.
+    pub remote: Option<String>,
     pub remote_store: String,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 31922b94..2511c5d5 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 
     let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
     for job in job_list {
-        if job.remote == name {
+        if job.remote.map_or(false, |id| id == name) {
             param_bail!(
                 "name",
                 "remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index 01e5f2ce..21634bd5 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
 
 use pbs_api_types::{
     Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
-    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
-    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
 };
 use pbs_config::sync;
 
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
-    remote_privs & PRIV_REMOTE_AUDIT != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+        remote_privs & PRIV_REMOTE_AUDIT != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+    }
 }
 
 /// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
-    remote_privs & PRIV_REMOTE_READ != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+        remote_privs & PRIV_REMOTE_READ != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_READ != 0
+    }
 }
 
 #[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
 #[serde(rename_all = "kebab-case")]
 /// Deletable property name
 pub enum DeletableProperty {
+    /// Delete the remote property(-> meaning local).
+    Remote,
     /// Delete the owner property.
     Owner,
     /// Delete the comment property.
@@ -275,6 +287,9 @@ pub fn update_sync_job(
     if let Some(delete) = delete {
         for delete_prop in delete {
             match delete_prop {
+                DeletableProperty::Remote => {
+                    data.remote = None;
+                }
                 DeletableProperty::Owner => {
                     data.owner = None;
                 }
@@ -334,7 +349,7 @@ pub fn update_sync_job(
         data.ns = Some(ns);
     }
     if let Some(remote) = update.remote {
-        data.remote = remote;
+        data.remote = Some(remote);
     }
     if let Some(remote_store) = update.remote_store {
         data.remote_store = remote_store;
@@ -503,7 +518,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
 
     let mut job = SyncJobConfig {
         id: "regular".to_string(),
-        remote: "remote0".to_string(),
+        remote: Some("remote0".to_string()),
         remote_store: "remotestore1".to_string(),
         remote_ns: None,
         store: "localstore0".to_string(),
@@ -538,11 +553,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
@@ -555,10 +570,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // writing without proper write permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // writing without proper write permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_modify_access(
         &user_info,
@@ -567,7 +582,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // reset remote to one where users have access
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // user with read permission can only read, but not modify/run
     assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index 866361c6..8f08d3af 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -78,11 +78,12 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
                 if let (Some(remote), Some(remote_store), Some(local_store)) =
                     (remote, remote_store, local_store)
                 {
+                    let remote_str = remote.as_str();
                     return check_pull_privs(
                         auth_id,
                         local_store.as_str(),
                         local_ns,
-                        remote.as_str(),
+                        (remote_str != "-").then_some(remote_str),
                         remote_store.as_str(),
                         false,
                     );
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index 8cd28fa0..eb9a2199 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -1,5 +1,5 @@
 //! Sync datastore from remote server
-use anyhow::{format_err, Error};
+use anyhow::{bail, format_err, Error};
 use futures::{future::FutureExt, select};
 
 use proxmox_router::{Permission, Router, RpcEnvironment};
@@ -22,7 +22,7 @@ pub fn check_pull_privs(
     auth_id: &Authid,
     store: &str,
     ns: Option<&str>,
-    remote: &str,
+    remote: Option<&str>,
     remote_store: &str,
     delete: bool,
 ) -> Result<(), Error> {
@@ -39,12 +39,22 @@ pub fn check_pull_privs(
         PRIV_DATASTORE_BACKUP,
         false,
     )?;
-    user_info.check_privs(
-        auth_id,
-        &["remote", remote, remote_store],
-        PRIV_REMOTE_READ,
-        false,
-    )?;
+
+    if let Some(remote) = remote {
+        user_info.check_privs(
+            auth_id,
+            &["remote", remote, remote_store],
+            PRIV_REMOTE_READ,
+            false,
+        )?;
+    } else {
+        user_info.check_privs(
+            auth_id,
+            &["datastore", remote_store],
+            PRIV_DATASTORE_BACKUP,
+            false,
+        )?;
+    }
 
     if delete {
         user_info.check_privs(
@@ -65,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            Some(&sync_job.remote),
+            sync_job.remote.as_deref(),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -91,7 +101,7 @@ pub fn do_sync_job(
 ) -> Result<String, Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
-        sync_job.remote,
+        sync_job.remote.as_deref().unwrap_or("-"),
         sync_job.remote_store,
         sync_job.store,
         sync_job.ns.clone().unwrap_or_default(),
@@ -99,6 +109,10 @@ pub fn do_sync_job(
     );
     let worker_type = job.jobtype().to_string();
 
+    if sync_job.remote.is_none() && sync_job.store == sync_job.remote_store {
+        bail!("can't sync to same datastore");
+    }
+
     let (email, notify) = crate::server::lookup_datastore_notify_settings(&sync_job.store);
 
     let upid_str = WorkerTask::spawn(
@@ -121,9 +135,12 @@ pub fn do_sync_job(
                 }
                 task_log!(
                     worker,
-                    "sync datastore '{}' from '{}/{}'",
+                    "sync datastore '{}' from '{}{}'",
                     sync_job.store,
-                    sync_job.remote,
+                    sync_job
+                        .remote
+                        .as_deref()
+                        .map_or(String::new(), |remote| format!("{remote}/")),
                     sync_job.remote_store,
                 );
 
@@ -179,6 +196,7 @@ pub fn do_sync_job(
             },
             remote: {
                 schema: REMOTE_ID_SCHEMA,
+                optional: true,
             },
             "remote-store": {
                 schema: DATASTORE_SCHEMA,
@@ -223,7 +241,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
 async fn pull(
     store: String,
     ns: Option<BackupNamespace>,
-    remote: String,
+    remote: Option<String>,
     remote_store: String,
     remote_ns: Option<BackupNamespace>,
     remove_vanished: Option<bool>,
@@ -236,6 +254,10 @@ async fn pull(
     let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?;
     let delete = remove_vanished.unwrap_or(false);
 
+    if remote.is_none() && store == remote_store {
+        bail!("can't sync to same datastore");
+    }
+
     let ns = ns.unwrap_or_default();
     let ns_str = if ns.is_root() {
         None
@@ -247,7 +269,7 @@ async fn pull(
         &auth_id,
         &store,
         ns_str.as_deref(),
-        &remote,
+        remote.as_deref(),
         &remote_store,
         delete,
     )?;
@@ -255,7 +277,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        Some(&remote),
+        remote.as_deref(),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -278,7 +300,7 @@ async fn pull(
                 worker,
                 "pull datastore '{}' from '{}/{}'",
                 store,
-                remote,
+                remote.as_deref().unwrap_or("-"),
                 remote_store,
             );
 
diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index b4948e43..eadfe547 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -535,7 +535,7 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
     param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             if let Ok(job) = get_sync_job(id) {
-                return Some(job.remote);
+                return job.remote;
             }
         }
         None
@@ -549,7 +549,7 @@ fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>,
         if let Some(id) = param.get("id") {
             job = get_sync_job(id).ok();
             if let Some(ref job) = job {
-                return Some(job.remote.clone());
+                return job.remote.clone();
             }
         }
         None
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index ea1476d7..18881782 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -484,15 +484,17 @@ pub fn send_sync_status(
         }
     };
 
+    let tmp_src_string;
+    let source_str = if let Some(remote) = &job.remote {
+        tmp_src_string = format!("Sync remote '{}'", remote);
+        &tmp_src_string
+    } else {
+        "Sync local"
+    };
+
     let subject = match result {
-        Ok(()) => format!(
-            "Sync remote '{}' datastore '{}' successful",
-            job.remote, job.remote_store,
-        ),
-        Err(_) => format!(
-            "Sync remote '{}' datastore '{}' failed",
-            job.remote, job.remote_store,
-        ),
+        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
     };
 
     send_job_status_mail(email, &subject, &text)?;
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (4 preceding siblings ...)
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 5/6] api: make Remote for SyncJob optional Hannes Laimer
@ 2023-11-21 14:31 ` Hannes Laimer
  2023-11-24 10:36   ` Lukas Wagner
  2023-11-22 10:14 ` [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Gabriel Goller
                   ` (2 subsequent siblings)
  8 siblings, 1 reply; 12+ messages in thread
From: Hannes Laimer @ 2023-11-21 14:31 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 www/Utils.js                     |  5 +++
 www/config/SyncView.js           |  1 +
 www/form/RemoteTargetSelector.js | 36 +++++++++++-----
 www/window/SyncJobEdit.js        | 73 +++++++++++++++++++++++++++++++-
 4 files changed, 104 insertions(+), 11 deletions(-)

diff --git a/www/Utils.js b/www/Utils.js
index 2eca600e..d7f11cb6 100644
--- a/www/Utils.js
+++ b/www/Utils.js
@@ -711,6 +711,11 @@ Ext.define('PBS.Utils', {
 	return Ext.String.htmlEncode(value);
     },
 
+    render_optional_remote: function(value, metadata, record) {
+	if (!value) return `- (${gettext('Local')})`;
+	return Ext.String.htmlEncode(value);
+    },
+
     tuningOptions: {
 	'chunk-order': {
 	    '__default__': Proxmox.Utils.defaultText + ` (${gettext('Inode')})`,
diff --git a/www/config/SyncView.js b/www/config/SyncView.js
index bf9072cb..c6458a9e 100644
--- a/www/config/SyncView.js
+++ b/www/config/SyncView.js
@@ -208,6 +208,7 @@ Ext.define('PBS.config.SyncJobView', {
 	    dataIndex: 'remote',
 	    width: 120,
 	    sortable: true,
+	    renderer: PBS.Utils.render_optional_remote,
 	},
 	{
 	    header: gettext('Remote Store'),
diff --git a/www/form/RemoteTargetSelector.js b/www/form/RemoteTargetSelector.js
index 2a94c4d7..9ea802d1 100644
--- a/www/form/RemoteTargetSelector.js
+++ b/www/form/RemoteTargetSelector.js
@@ -44,20 +44,25 @@ Ext.define('PBS.form.RemoteStoreSelector', {
 
 	me.store.removeAll();
 
+	me.setDisabled(false);
+	if (!me.firstLoad) {
+	    me.clearValue();
+	}
 	if (me.remote) {
-	    me.setDisabled(false);
-	    if (!me.firstLoad) {
-		me.clearValue();
-	    }
-
 	    me.store.proxy.url = `/api2/json/config/remote/${encodeURIComponent(me.remote)}/scan`;
-	    me.store.load();
-
-	    me.firstLoad = false;
+	    me.store.removeFilter('storeFilter');
 	} else {
-	    me.setDisabled(true);
-	    me.clearValue();
+	    me.store.proxy.url = '/api2/json/admin/datastore';
+	    me.store.addFilter({
+		filterFn: function(item) {
+		    return item.get('store') !== me.datastore;
+		},
+		id: 'storeFilter',
+	    });
 	}
+	me.store.load();
+
+	me.firstLoad = false;
     },
 
     initComponent: function() {
@@ -175,6 +180,17 @@ Ext.define('PBS.form.RemoteNamespaceSelector', {
 	    me.store.proxy.url = `/api2/json/config/remote/${encodedRemote}/scan/${encodedStore}/namespaces`;
 	    me.store.load();
 
+	    me.firstLoad = false;
+	} else if (me.remoteStore) {
+	    me.setDisabled(false);
+	    if (!me.firstLoad) {
+		me.clearValue();
+	    }
+	    let encodedStore = encodeURIComponent(me.remoteStore);
+
+	    me.store.proxy.url = `/api2/json/admin/datastore/${encodedStore}/namespace`;
+	    me.store.load();
+
 	    me.firstLoad = false;
 	} else if (previousStore) {
 	    me.setDisabled(true);
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 48a0c7a9..282f16a3 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -47,6 +47,15 @@ Ext.define('PBS.window.SyncJobEdit', {
 	},
     },
 
+    setValues: function(values) {
+	let me = this;
+	if (values.id && !values.remote) {
+	    values.location = 'local';
+	} else {
+	    values.location = 'remote';
+	}
+	me.callParent([values]);
+    },
 
     items: {
 	xtype: 'tabpanel',
@@ -134,16 +143,76 @@ Ext.define('PBS.window.SyncJobEdit', {
 		],
 
 		column2: [
+		    {
+			xtype: 'radiogroup',
+			fieldLabel: gettext('Location'),
+			defaultType: 'radiofield',
+			items: [
+			    {
+				boxLabel: 'Local',
+				name: 'location',
+				inputValue: 'local',
+				submitValue: false,
+			    },
+			    {
+				boxLabel: 'Remote',
+				name: 'location',
+				inputValue: 'remote',
+				submitValue: false,
+				checked: true,
+			    },
+			],
+			listeners: {
+			    change: function(_group, radio) {
+				let me = this;
+				let form = me.up('pbsSyncJobEdit');
+				let nsField = form.down('field[name=remote-ns]');
+				let rateLimitField = form.down('field[name=rate-in]');
+				let remoteField = form.down('field[name=remote]');
+				let storeField = form.down('field[name=remote-store]');
+
+				if (!storeField.value) {
+				    nsField.clearValue();
+				    nsField.setDisabled(true);
+				}
+
+				let isLocalSync = radio.location === 'local';
+				remoteField.allowBlank = isLocalSync;
+				remoteField.setDisabled(isLocalSync);
+				storeField.setDisabled(!isLocalSync && !remoteField.value);
+				if (isLocalSync === !!remoteField.value) {
+				    storeField.clearValue();
+				    remoteField.clearValue();
+				}
+
+				if (isLocalSync) {
+				    storeField.setDisabled(false);
+				    rateLimitField.setValue(null);
+				} else {
+				    remoteField.validate();
+				}
+			    },
+			},
+		    },
 		    {
 			fieldLabel: gettext('Source Remote'),
 			xtype: 'pbsRemoteSelector',
 			allowBlank: false,
 			name: 'remote',
+			cbind: {
+			    deleteEmpty: '{!isCreate}',
+			},
+			skipEmptyText: true,
 			listeners: {
 			    change: function(f, value) {
 				let me = this;
 				let remoteStoreField = me.up('pbsSyncJobEdit').down('field[name=remote-store]');
 				remoteStoreField.setRemote(value);
+				let rateLimitField = me.up('pbsSyncJobEdit').down('field[name=rate-in]');
+				rateLimitField.setDisabled(!value);
+				if (!value) {
+				    rateLimitField.setValue(null);
+				}
 				let remoteNamespaceField = me.up('pbsSyncJobEdit').down('field[name=remote-ns]');
 				remoteNamespaceField.setRemote(value);
 			    },
@@ -155,7 +224,9 @@ Ext.define('PBS.window.SyncJobEdit', {
 			allowBlank: false,
 			autoSelect: false,
 			name: 'remote-store',
-			disabled: true,
+			cbind: {
+			    datastore: '{datastore}',
+			},
 			listeners: {
 			    change: function(field, value) {
 				let me = this;
-- 
2.39.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (5 preceding siblings ...)
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
@ 2023-11-22 10:14 ` Gabriel Goller
  2023-11-24 10:38 ` Lukas Wagner
  2023-11-25 16:14 ` [pbs-devel] applied: " Thomas Lamprecht
  8 siblings, 0 replies; 12+ messages in thread
From: Gabriel Goller @ 2023-11-22 10:14 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

LGTM!
All the issues from the previous version were fixed as well, so:

Tested-by: Gabriel Goller <g.goller@proxmox.com>

One tiny (non-critical) issue though:
You are allowed to create a local sync to the same datastore when using
`proxmox-backup-manager` in the cli. You will not be able to run that sync
job (neither the ui, nor the cli), but it would still be nice if there was
some client-side error prohibiting you from creating the job.
Although I would open a separate bug so that we can get this series merged!





^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob
  2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
@ 2023-11-24 10:36   ` Lukas Wagner
  2023-11-25 16:16     ` Thomas Lamprecht
  0 siblings, 1 reply; 12+ messages in thread
From: Lukas Wagner @ 2023-11-24 10:36 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer



On 11/21/23 15:31, Hannes Laimer wrote:
> diff --git a/www/Utils.js b/www/Utils.js
> index 2eca600e..d7f11cb6 100644
> --- a/www/Utils.js
> +++ b/www/Utils.js
> @@ -711,6 +711,11 @@ Ext.define('PBS.Utils', {
>   	return Ext.String.htmlEncode(value);
>       },
>   
> +    render_optional_remote: function(value, metadata, record) {
Nit: New functions/variables should use camelCase, see
https://pve.proxmox.com/wiki/Javascript_Style_Guide#Casing

> +	if (!value) return `- (${gettext('Local')})`;
Nit: Don't use single-line if statements for new code, see
https://pve.proxmox.com/wiki/Javascript_Style_Guide#Single-Line_If-Statement

> +	return Ext.String.htmlEncode(value);
> +    },
> + >       tuningOptions: {
>   	'chunk-order': {
>   	    '__default__': Proxmox.Utils.defaultText + ` (${gettext('Inode')})`,
> diff --git a/www/config/SyncView.js b/www/config/SyncView.js
> index bf9072cb..c6458a9e 100644
> --- a/www/config/SyncView.js
> +++ b/www/config/SyncView.js
> @@ -208,6 +208,7 @@ Ext.define('PBS.config.SyncJobView', {
>   	    dataIndex: 'remote',
>   	    width: 120,
>   	    sortable: true,
> +	    renderer: PBS.Utils.render_optional_remote,
>   	},
>   	{
>   	    header: gettext('Remote Store'),

(...)

>   
> +    setValues: function(values) {
> +	let me = this;
> +	if (values.id && !values.remote) {
> +	    values.location = 'local';
> +	} else {
> +	    values.location = 'remote';
> +	}
> +	me.callParent([values]);
> +    },
>   
>       items: {
>   	xtype: 'tabpanel',
> @@ -134,16 +143,76 @@ Ext.define('PBS.window.SyncJobEdit', {
>   		],
>   
>   		column2: [
> +		    {
> +			xtype: 'radiogroup',
> +			fieldLabel: gettext('Location'),
> +			defaultType: 'radiofield',
> +			items: [
> +			    {
> +				boxLabel: 'Local',
> +				name: 'location',
> +				inputValue: 'local',
> +				submitValue: false,
> +			    },
> +			    {
> +				boxLabel: 'Remote',
> +				name: 'location',
> +				inputValue: 'remote',
> +				submitValue: false,
> +				checked: true,
> +			    },
> +			],
> +			listeners: {
> +			    change: function(_group, radio) {

Nit: This component already has a controller, including a 'control'
section for event routing - I guess it would be cleaner
if this function was part of the controller, triggered
by an entry for 'control'?

See:
https://docs-devel.sencha.com/extjs/7.0.0/classic/Ext.app.ViewController.html#cfg-control

However, it seems like other form elements here already use the same 
approach as you, so I guess it's fine.

> +				let me = this;
> +				let form = me.up('pbsSyncJobEdit');
> +				let nsField = form.down('field[name=remote-ns]');
> +				let rateLimitField = form.down('field[name=rate-in]');
> +				let remoteField = form.down('field[name=remote]');
> +				let storeField = form.down('field[name=remote-store]');
> +

(...)

-- 
- Lukas




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (6 preceding siblings ...)
  2023-11-22 10:14 ` [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Gabriel Goller
@ 2023-11-24 10:38 ` Lukas Wagner
  2023-11-25 16:14 ` [pbs-devel] applied: " Thomas Lamprecht
  8 siblings, 0 replies; 12+ messages in thread
From: Lukas Wagner @ 2023-11-24 10:38 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

On 11/21/23 15:31, Hannes Laimer wrote:
> Add support for local sync. SyncJobs without a remote are considered local, and
> use a different logic for pulling. In the course of adding the extra pull logic,
> the pull code was rewritten to basically be source independent. Also cli
> completion and the UI was updated to allow Remotes in SyncJobs to be optional.
> 

Gave this another quick test on the latest master, LGTM.
Quickly glanced over the code, didn't see anything bothersome
there (three minor nits in the UI code, but no blockers)

Tested-by: Lukas Wagner <l.wagner@proxmox.com>
Reviewed-by: Lukas Wagner <l.wagner@proxmox.com>

-- 
- Lukas




^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] applied: [PATCH proxmox-backup v6 0/6] local sync-jobs
  2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
                   ` (7 preceding siblings ...)
  2023-11-24 10:38 ` Lukas Wagner
@ 2023-11-25 16:14 ` Thomas Lamprecht
  8 siblings, 0 replies; 12+ messages in thread
From: Thomas Lamprecht @ 2023-11-25 16:14 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Hannes Laimer

Am 21/11/2023 um 15:31 schrieb Hannes Laimer:
> Add support for local sync. SyncJobs without a remote are considered local, and
> use a different logic for pulling. In the course of adding the extra pull logic,
> the pull code was rewritten to basically be source independent. Also cli
> completion and the UI was updated to allow Remotes in SyncJobs to be optional.

> Hannes Laimer (6):
>   accept a ref to a HttpClient
>   pull: refactor pulling from a datastore
>   pull: add support for pulling from local datastore
>   manager: add completion for opt. Remote in SyncJob
>   api: make Remote for SyncJob optional
>   ui: add support for optional Remote in SyncJob
> 
>  Cargo.toml                           |    2 +
>  examples/download-speed.rs           |    2 +-
>  pbs-api-types/src/jobs.rs            |    9 +-
>  pbs-client/src/backup_reader.rs      |    2 +-
>  pbs-datastore/src/read_chunk.rs      |    2 +-
>  proxmox-backup-client/src/catalog.rs |    4 +-
>  proxmox-backup-client/src/main.rs    |    2 +-
>  proxmox-backup-client/src/mount.rs   |    2 +-
>  proxmox-file-restore/src/main.rs     |    4 +-
>  src/api2/config/remote.rs            |   16 +-
>  src/api2/config/sync.rs              |   41 +-
>  src/api2/node/tasks.rs               |    3 +-
>  src/api2/pull.rs                     |   60 +-
>  src/bin/proxmox-backup-manager.rs    |   67 +-
>  src/bin/proxmox_backup_debug/diff.rs |    2 +-
>  src/server/email_notifications.rs    |   18 +-
>  src/server/pull.rs                   | 1073 ++++++++++++++++----------
>  www/Utils.js                         |    5 +
>  www/config/SyncView.js               |    1 +
>  www/form/RemoteTargetSelector.js     |   36 +-
>  www/window/SyncJobEdit.js            |   73 +-
>  21 files changed, 934 insertions(+), 490 deletions(-)
> 


applied series, thanks!

Can you please sent a follow-up for Gabriel comment:

Am 22/11/2023 um 11:14 schrieb Gabriel Goller:
> You are allowed to create a local sync to the same datastore when using
> `proxmox-backup-manager` in the cli. You will not be able to run that sync
> job (neither the ui, nor the cli), but it would still be nice if there was
> some client-side error prohibiting you from creating the job.

As I also think that this is not fully ideal and should be blocked to happen
for better UX.




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob
  2023-11-24 10:36   ` Lukas Wagner
@ 2023-11-25 16:16     ` Thomas Lamprecht
  0 siblings, 0 replies; 12+ messages in thread
From: Thomas Lamprecht @ 2023-11-25 16:16 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion, Lukas Wagner,
	Hannes Laimer

Am 24/11/2023 um 11:36 schrieb Lukas Wagner:
> 
> 
> On 11/21/23 15:31, Hannes Laimer wrote:
>> diff --git a/www/Utils.js b/www/Utils.js
>> index 2eca600e..d7f11cb6 100644
>> --- a/www/Utils.js
>> +++ b/www/Utils.js
>> @@ -711,6 +711,11 @@ Ext.define('PBS.Utils', {
>>   	return Ext.String.htmlEncode(value);
>>       },
>>   
>> +    render_optional_remote: function(value, metadata, record) {
> Nit: New functions/variables should use camelCase, see
> https://pve.proxmox.com/wiki/Javascript_Style_Guide#Casing

I agree, but kept this as is, because the function above this
is named render_optional_namespace, this probably should be fixed in a more
tree-wide manner

> 
>> +	if (!value) return `- (${gettext('Local')})`;
> Nit: Don't use single-line if statements for new code, see
> https://pve.proxmox.com/wiki/Javascript_Style_Guide#Single-Line_If-Statement

fixed in a follow-up, those are indeed not nice to read.

>> +			listeners: {
>> +			    change: function(_group, radio) {
> 
> Nit: This component already has a controller, including a 'control'
> section for event routing - I guess it would be cleaner
> if this function was part of the controller, triggered
> by an entry for 'control'?
> 
> See:
> https://docs-devel.sencha.com/extjs/7.0.0/classic/Ext.app.ViewController.html#cfg-control
> 
> However, it seems like other form elements here already use the same 
> approach as you, so I guess it's fine.


Yeah I agree, if there's a controller then all* view logic should use
that. But no biggie for now.

*there certainly are exceptions like to all rules/guidelines




^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2023-11-25 16:16 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-11-21 14:31 [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 1/6] accept a ref to a HttpClient Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 2/6] pull: refactor pulling from a datastore Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 3/6] pull: add support for pulling from local datastore Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 4/6] manager: add completion for opt. Remote in SyncJob Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 5/6] api: make Remote for SyncJob optional Hannes Laimer
2023-11-21 14:31 ` [pbs-devel] [PATCH proxmox-backup v6 6/6] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-11-24 10:36   ` Lukas Wagner
2023-11-25 16:16     ` Thomas Lamprecht
2023-11-22 10:14 ` [pbs-devel] [PATCH proxmox-backup v6 0/6] local sync-jobs Gabriel Goller
2023-11-24 10:38 ` Lukas Wagner
2023-11-25 16:14 ` [pbs-devel] applied: " Thomas Lamprecht

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal