public inbox for pbs-devel@lists.proxmox.com
 help / color / mirror / Atom feed
* [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs
@ 2023-02-23 12:55 Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
                   ` (5 more replies)
  0 siblings, 6 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

Add support for local sync. SyncJobs without a remote are considered local, and
use a different logic for pulling. In the course of adding the extra pull logic,
the pull code was rewritten to basically be source independent. Also cli
completion and the UI was updated to allow Remotes in SyncJobs to be optional.

Hannes Laimer (5):
  api2: make Remote for SyncJob optional
  ui: add support for optional Remote in SyncJob
  manager: add completion for opt. Remote in SyncJob
  pbs-client: accept a ref to a HttpClient in BackupReader::starting
  pull: add support for local pulling

 pbs-api-types/src/datastore.rs       |    2 +-
 pbs-api-types/src/jobs.rs            |    4 +-
 pbs-client/src/backup_reader.rs      |    2 +-
 pbs-datastore/src/read_chunk.rs      |    2 +-
 proxmox-backup-client/src/catalog.rs |    4 +-
 proxmox-backup-client/src/main.rs    |    2 +-
 proxmox-backup-client/src/mount.rs   |    2 +-
 src/api2/config/remote.rs            |    2 +-
 src/api2/config/sync.rs              |   41 +-
 src/api2/node/tasks.rs               |    4 +-
 src/api2/pull.rs                     |   79 +-
 src/bin/proxmox-backup-manager.rs    |   67 +-
 src/bin/proxmox_backup_debug/diff.rs |    2 +-
 src/server/email_notifications.rs    |   16 +-
 src/server/pull.rs                   | 1023 ++++++++++++++++----------
 www/form/RemoteTargetSelector.js     |   29 +-
 www/window/SyncJobEdit.js            |    8 +-
 17 files changed, 812 insertions(+), 477 deletions(-)

-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
@ 2023-02-23 12:55 ` Hannes Laimer
  2023-02-28  9:41   ` Wolfgang Bumiller
  2023-02-28 11:35   ` Fabian Grünbichler
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] ui: add support for optional Remote in SyncJob Hannes Laimer
                   ` (4 subsequent siblings)
  5 siblings, 2 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/jobs.rs         |  4 +-
 src/api2/config/remote.rs         |  2 +-
 src/api2/config/sync.rs           | 41 +++++++++++++------
 src/api2/node/tasks.rs            |  4 +-
 src/api2/pull.rs                  | 68 +++++++++++++++++++++++--------
 src/server/email_notifications.rs | 16 ++++----
 6 files changed, 93 insertions(+), 42 deletions(-)

diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
index cf7618c4..68db6cb8 100644
--- a/pbs-api-types/src/jobs.rs
+++ b/pbs-api-types/src/jobs.rs
@@ -462,6 +462,7 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
         },
         remote: {
             schema: REMOTE_ID_SCHEMA,
+            optional: true,
         },
         "remote-store": {
             schema: DATASTORE_SCHEMA,
@@ -506,7 +507,8 @@ pub struct SyncJobConfig {
     pub ns: Option<BackupNamespace>,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub owner: Option<Authid>,
-    pub remote: String,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub remote: Option<String>,
     pub remote_store: String,
     #[serde(skip_serializing_if = "Option::is_none")]
     pub remote_ns: Option<BackupNamespace>,
diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
index 2f02d121..aa74bdc0 100644
--- a/src/api2/config/remote.rs
+++ b/src/api2/config/remote.rs
@@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 
     let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
     for job in job_list {
-        if job.remote == name {
+        if job.remote.map_or(false, |id| id == name) {
             param_bail!(
                 "name",
                 "remote '{}' is used by sync job '{}' (datastore '{}')",
diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
index bd7373df..4c5d06e2 100644
--- a/src/api2/config/sync.rs
+++ b/src/api2/config/sync.rs
@@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
 
 use pbs_api_types::{
     Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
-    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
-    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
+    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
+    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
 };
 use pbs_config::sync;
 
@@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
-    remote_privs & PRIV_REMOTE_AUDIT != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
+        remote_privs & PRIV_REMOTE_AUDIT != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
+    }
 }
 
 /// checks whether user can run the corresponding pull job
@@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
         return false;
     }
 
-    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
-    remote_privs & PRIV_REMOTE_READ != 0
+    if let Some(remote) = &job.remote {
+        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
+        remote_privs & PRIV_REMOTE_READ != 0
+    } else {
+        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
+        source_ds_privs & PRIV_DATASTORE_READ != 0
+    }
 }
 
 #[api(
@@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
 #[serde(rename_all = "kebab-case")]
 /// Deletable property name
 pub enum DeletableProperty {
+    /// Delete the remote property(-> meaning local).
+    Remote,
     /// Delete the owner property.
     Owner,
     /// Delete the comment property.
@@ -273,6 +285,9 @@ pub fn update_sync_job(
     if let Some(delete) = delete {
         for delete_prop in delete {
             match delete_prop {
+                DeletableProperty::Remote => {
+                    data.remote = None;
+                }
                 DeletableProperty::Owner => {
                     data.owner = None;
                 }
@@ -329,7 +344,7 @@ pub fn update_sync_job(
         data.ns = Some(ns);
     }
     if let Some(remote) = update.remote {
-        data.remote = remote;
+        data.remote = Some(remote);
     }
     if let Some(remote_store) = update.remote_store {
         data.remote_store = remote_store;
@@ -495,7 +510,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
 
     let mut job = SyncJobConfig {
         id: "regular".to_string(),
-        remote: "remote0".to_string(),
+        remote: Some("remote0".to_string()),
         remote_store: "remotestore1".to_string(),
         remote_ns: None,
         store: "localstore0".to_string(),
@@ -529,11 +544,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
     // reading without proper read permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
 
@@ -546,10 +561,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // writing without proper write permissions on local end must fail
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // writing without proper write permissions on remote end must fail
-    job.remote = "remote0".to_string();
+    job.remote = Some("remote0".to_string());
     job.store = "localstore1".to_string();
     assert!(!check_sync_job_modify_access(
         &user_info,
@@ -558,7 +573,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
     ));
 
     // reset remote to one where users have access
-    job.remote = "remote1".to_string();
+    job.remote = Some("remote1".to_string());
 
     // user with read permission can only read, but not modify/run
     assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
index d386f805..780cb6d1 100644
--- a/src/api2/node/tasks.rs
+++ b/src/api2/node/tasks.rs
@@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
                 let local_store = captures.get(3);
                 let local_ns = captures.get(4).map(|m| m.as_str());
 
-                if let (Some(remote), Some(remote_store), Some(local_store)) =
+                if let (remote, Some(remote_store), Some(local_store)) =
                     (remote, remote_store, local_store)
                 {
                     return check_pull_privs(
                         auth_id,
                         local_store.as_str(),
                         local_ns,
-                        remote.as_str(),
+                        remote.map(|remote| remote.as_str()),
                         remote_store.as_str(),
                         false,
                     );
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index b2473ec8..bb8f6fe1 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -9,7 +9,8 @@ use proxmox_sys::task_log;
 use pbs_api_types::{
     Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
     GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
-    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
+    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
+    REMOVE_VANISHED_BACKUPS_SCHEMA,
 };
 use pbs_config::CachedUserInfo;
 use proxmox_rest_server::WorkerTask;
@@ -21,7 +22,7 @@ pub fn check_pull_privs(
     auth_id: &Authid,
     store: &str,
     ns: Option<&str>,
-    remote: &str,
+    remote: Option<&str>,
     remote_store: &str,
     delete: bool,
 ) -> Result<(), Error> {
@@ -38,12 +39,22 @@ pub fn check_pull_privs(
         PRIV_DATASTORE_BACKUP,
         false,
     )?;
-    user_info.check_privs(
-        auth_id,
-        &["remote", remote, remote_store],
-        PRIV_REMOTE_READ,
-        false,
-    )?;
+
+    if let Some(remote) = remote {
+        user_info.check_privs(
+            auth_id,
+            &["remote", remote, remote_store],
+            PRIV_REMOTE_READ,
+            false,
+        )?;
+    } else {
+        user_info.check_privs(
+            auth_id,
+            &["datastore", remote_store],
+            PRIV_DATASTORE_BACKUP,
+            false,
+        )?;
+    }
 
     if delete {
         user_info.check_privs(
@@ -64,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
         PullParameters::new(
             &sync_job.store,
             sync_job.ns.clone().unwrap_or_default(),
-            &sync_job.remote,
+            sync_job.remote.clone().as_deref(),
             &sync_job.remote_store,
             sync_job.remote_ns.clone().unwrap_or_default(),
             sync_job
@@ -75,7 +86,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
             sync_job.remove_vanished,
             sync_job.max_depth,
             sync_job.group_filter.clone(),
-            sync_job.limit.clone(),
         )
     }
 }
@@ -89,7 +99,7 @@ pub fn do_sync_job(
 ) -> Result<String, Error> {
     let job_id = format!(
         "{}:{}:{}:{}:{}",
-        sync_job.remote,
+        sync_job.remote.clone().unwrap_or("localhost".to_string()),
         sync_job.remote_store,
         sync_job.store,
         sync_job.ns.clone().unwrap_or_default(),
@@ -122,11 +132,34 @@ pub fn do_sync_job(
                     worker,
                     "sync datastore '{}' from '{}/{}'",
                     sync_job.store,
-                    sync_job.remote,
+                    sync_job.remote.clone().unwrap_or("local".to_string()),
                     sync_job.remote_store,
                 );
 
-                pull_store(&worker, &client, pull_params).await?;
+                if sync_job.remote.is_some() {
+                    pull_store(&worker, &client, pull_params).await?;
+                } else {
+                    match (sync_job.ns, sync_job.remote_ns) {
+                        (Some(target_ns), Some(source_ns))
+                            if target_ns.path().starts_with(source_ns.path())
+                                && sync_job.store == sync_job.remote_store =>
+                        {
+                            task_log!(
+                                worker,
+                                "Can't sync namespace into one of its sub-namespaces, skipping"
+                            );
+                        }
+                        (_, None) if sync_job.store == sync_job.remote_store => {
+                            task_log!(
+                                worker,
+                                "Can't sync root namespace into same datastore, skipping"
+                            );
+                        }
+                        _ => {
+                            pull_store(&worker, pull_params).await?;
+                        }
+                    }
+                }
 
                 task_log!(worker, "sync job '{}' end", &job_id);
 
@@ -178,6 +211,7 @@ pub fn do_sync_job(
             },
             remote: {
                 schema: REMOTE_ID_SCHEMA,
+                optional: true,
             },
             "remote-store": {
                 schema: DATASTORE_SCHEMA,
@@ -218,7 +252,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
 async fn pull(
     store: String,
     ns: Option<BackupNamespace>,
-    remote: String,
+    remote: Option<String>,
     remote_store: String,
     remote_ns: Option<BackupNamespace>,
     remove_vanished: Option<bool>,
@@ -241,7 +275,7 @@ async fn pull(
         &auth_id,
         &store,
         ns_str.as_deref(),
-        &remote,
+        remote.as_deref(),
         &remote_store,
         delete,
     )?;
@@ -249,7 +283,7 @@ async fn pull(
     let pull_params = PullParameters::new(
         &store,
         ns,
-        &remote,
+        remote.as_deref(),
         &remote_store,
         remote_ns.unwrap_or_default(),
         auth_id.clone(),
@@ -272,7 +306,7 @@ async fn pull(
                 worker,
                 "pull datastore '{}' from '{}/{}'",
                 store,
-                remote,
+                remote.as_deref().unwrap_or("localhost"),
                 remote_store,
             );
 
diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
index b3298cf9..31a46b0f 100644
--- a/src/server/email_notifications.rs
+++ b/src/server/email_notifications.rs
@@ -486,15 +486,15 @@ pub fn send_sync_status(
         }
     };
 
+    let source_str = if let Some(remote) = job.remote.clone() {
+        format!("Sync remote '{}'", remote)
+    } else {
+        format!("Sync local")
+    };
+
     let subject = match result {
-        Ok(()) => format!(
-            "Sync remote '{}' datastore '{}' successful",
-            job.remote, job.remote_store,
-        ),
-        Err(_) => format!(
-            "Sync remote '{}' datastore '{}' failed",
-            job.remote, job.remote_store,
-        ),
+        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
+        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
     };
 
     send_job_status_mail(email, &subject, &text)?;
-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 2/5] ui: add support for optional Remote in SyncJob
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
@ 2023-02-23 12:55 ` Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] manager: add completion for opt. " Hannes Laimer
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 www/form/RemoteTargetSelector.js | 29 +++++++++++++++++++----------
 www/window/SyncJobEdit.js        |  8 ++++++--
 2 files changed, 25 insertions(+), 12 deletions(-)

diff --git a/www/form/RemoteTargetSelector.js b/www/form/RemoteTargetSelector.js
index 2a94c4d7..e7b822d7 100644
--- a/www/form/RemoteTargetSelector.js
+++ b/www/form/RemoteTargetSelector.js
@@ -44,20 +44,18 @@ Ext.define('PBS.form.RemoteStoreSelector', {
 
 	me.store.removeAll();
 
+	me.setDisabled(false);
+	if (!me.firstLoad) {
+	    me.clearValue();
+	}
 	if (me.remote) {
-	    me.setDisabled(false);
-	    if (!me.firstLoad) {
-		me.clearValue();
-	    }
-
 	    me.store.proxy.url = `/api2/json/config/remote/${encodeURIComponent(me.remote)}/scan`;
-	    me.store.load();
-
-	    me.firstLoad = false;
 	} else {
-	    me.setDisabled(true);
-	    me.clearValue();
+	    me.store.proxy.url = '/api2/json/admin/datastore';
 	}
+	me.store.load();
+
+	me.firstLoad = false;
     },
 
     initComponent: function() {
@@ -175,6 +173,17 @@ Ext.define('PBS.form.RemoteNamespaceSelector', {
 	    me.store.proxy.url = `/api2/json/config/remote/${encodedRemote}/scan/${encodedStore}/namespaces`;
 	    me.store.load();
 
+	    me.firstLoad = false;
+	} else if (me.remoteStore) {
+	    me.setDisabled(false);
+	    if (!me.firstLoad) {
+		me.clearValue();
+	    }
+	    let encodedStore = encodeURIComponent(me.remoteStore);
+
+	    me.store.proxy.url = `/api2/json/admin/datastore/${encodedStore}/namespace`;
+	    me.store.load();
+
 	    me.firstLoad = false;
 	} else if (previousStore) {
 	    me.setDisabled(true);
diff --git a/www/window/SyncJobEdit.js b/www/window/SyncJobEdit.js
index 948ad5da..401a03b7 100644
--- a/www/window/SyncJobEdit.js
+++ b/www/window/SyncJobEdit.js
@@ -137,8 +137,13 @@ Ext.define('PBS.window.SyncJobEdit', {
 		    {
 			fieldLabel: gettext('Source Remote'),
 			xtype: 'pbsRemoteSelector',
-			allowBlank: false,
+			allowBlank: true,
+			emptyText: gettext('Local'),
 			name: 'remote',
+			cbind: {
+			    deleteEmpty: '{!isCreate}',
+			},
+			skipEmptyText: true,
 			listeners: {
 			    change: function(f, value) {
 				let me = this;
@@ -155,7 +160,6 @@ Ext.define('PBS.window.SyncJobEdit', {
 			allowBlank: false,
 			autoSelect: false,
 			name: 'remote-store',
-			disabled: true,
 			listeners: {
 			    change: function(field, value) {
 				let me = this;
-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 3/5] manager: add completion for opt. Remote in SyncJob
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] ui: add support for optional Remote in SyncJob Hannes Laimer
@ 2023-02-23 12:55 ` Hannes Laimer
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting Hannes Laimer
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 src/bin/proxmox-backup-manager.rs | 67 +++++++++++++++++++------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs
index 740fdc49..1944c468 100644
--- a/src/bin/proxmox-backup-manager.rs
+++ b/src/bin/proxmox-backup-manager.rs
@@ -526,35 +526,33 @@ fn get_remote(param: &HashMap<String, String>) -> Option<String> {
     param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             if let Ok(job) = get_sync_job(id) {
-                return Some(job.remote);
+                return job.remote;
             }
         }
         None
     })
 }
 
-fn get_remote_store(param: &HashMap<String, String>) -> Option<(String, String)> {
+fn get_remote_store(param: &HashMap<String, String>) -> Option<(Option<String>, String)> {
     let mut job: Option<SyncJobConfig> = None;
 
     let remote = param.get("remote").map(|r| r.to_owned()).or_else(|| {
         if let Some(id) = param.get("id") {
             job = get_sync_job(id).ok();
             if let Some(ref job) = job {
-                return Some(job.remote.clone());
+                return job.remote.clone();
             }
         }
         None
     });
 
-    if let Some(remote) = remote {
-        let store = param
-            .get("remote-store")
-            .map(|r| r.to_owned())
-            .or_else(|| job.map(|job| job.remote_store));
+    let store = param
+        .get("remote-store")
+        .map(|r| r.to_owned())
+        .or_else(|| job.map(|job| job.remote_store));
 
-        if let Some(store) = store {
-            return Some((remote, store));
-        }
+    if let Some(store) = store {
+        return Some((remote, store));
     }
 
     None
@@ -575,7 +573,7 @@ fn get_remote_ns(param: &HashMap<String, String>) -> Option<BackupNamespace> {
 }
 
 // shell completion helper
-pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
+pub fn complete_remote_datastore_name(arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
     if let Some(remote) = get_remote(param) {
@@ -586,7 +584,9 @@ pub fn complete_remote_datastore_name(_arg: &str, param: &HashMap<String, String
                 list.push(item.store);
             }
         }
-    }
+    } else {
+        list = pbs_config::datastore::complete_datastore_name(arg, param);
+    };
 
     list
 }
@@ -598,17 +598,25 @@ pub fn complete_remote_datastore_namespace(
 ) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_namespaces(
                 remote.clone(),
                 remote_store.clone(),
             )
             .await
-        }) {
-            for item in data {
-                list.push(item.ns.name());
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::namespace::list_namespaces(source_store, None, None, &mut rpcenv)
+                .ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(item.ns.name());
         }
     }
 
@@ -653,19 +661,26 @@ pub fn complete_sync_local_datastore_namespace(
 pub fn complete_remote_datastore_group(_arg: &str, param: &HashMap<String, String>) -> Vec<String> {
     let mut list = Vec::new();
 
-    if let Some((remote, remote_store)) = get_remote_store(param) {
-        let ns = get_remote_ns(param);
-        if let Ok(data) = proxmox_async::runtime::block_on(async move {
+    let ns = get_remote_ns(param);
+    if let Some(data) = match get_remote_store(param) {
+        Some((Some(remote), remote_store)) => proxmox_async::runtime::block_on(async move {
             crate::api2::config::remote::scan_remote_groups(
                 remote.clone(),
                 remote_store.clone(),
                 ns,
             )
             .await
-        }) {
-            for item in data {
-                list.push(format!("{}/{}", item.backup.ty, item.backup.id));
-            }
+            .ok()
+        }),
+        Some((None, source_store)) => {
+            let mut rpcenv = CliEnvironment::new();
+            rpcenv.set_auth_id(Some(String::from("root@pam")));
+            crate::api2::admin::datastore::list_groups(source_store, ns, &mut rpcenv).ok()
+        }
+        _ => None,
+    } {
+        for item in data {
+            list.push(format!("{}/{}", item.backup.ty, item.backup.id));
         }
     }
 
-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
                   ` (2 preceding siblings ...)
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] manager: add completion for opt. " Hannes Laimer
@ 2023-02-23 12:55 ` Hannes Laimer
  2023-02-28 11:35   ` Fabian Grünbichler
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling Hannes Laimer
  2023-02-28 11:35 ` [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Fabian Grünbichler
  5 siblings, 1 reply; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

... since the function doesn't actually need the moved value.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---

needed for next patch

 pbs-client/src/backup_reader.rs      | 2 +-
 proxmox-backup-client/src/catalog.rs | 4 ++--
 proxmox-backup-client/src/main.rs    | 2 +-
 proxmox-backup-client/src/mount.rs   | 2 +-
 src/bin/proxmox_backup_debug/diff.rs | 2 +-
 5 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
index 2cd4dc27..36d8ebcf 100644
--- a/pbs-client/src/backup_reader.rs
+++ b/pbs-client/src/backup_reader.rs
@@ -44,7 +44,7 @@ impl BackupReader {
 
     /// Create a new instance by upgrading the connection at '/api2/json/reader'
     pub async fn start(
-        client: HttpClient,
+        client: &HttpClient,
         crypt_config: Option<Arc<CryptConfig>>,
         datastore: &str,
         ns: &BackupNamespace,
diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
index 8c8c1458..72b22e67 100644
--- a/proxmox-backup-client/src/catalog.rs
+++ b/proxmox-backup-client/src/catalog.rs
@@ -75,7 +75,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
     let client = connect(&repo)?;
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
@@ -187,7 +187,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
index 55198108..e7b5bde6 100644
--- a/proxmox-backup-client/src/main.rs
+++ b/proxmox-backup-client/src/main.rs
@@ -1286,7 +1286,7 @@ async fn restore(
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &ns,
diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
index 6810c19c..66bc56f7 100644
--- a/proxmox-backup-client/src/mount.rs
+++ b/proxmox-backup-client/src/mount.rs
@@ -234,7 +234,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
     };
 
     let client = BackupReader::start(
-        client,
+        &client,
         crypt_config.clone(),
         repo.store(),
         &backup_ns,
diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
index 288d35ce..bb68322b 100644
--- a/src/bin/proxmox_backup_debug/diff.rs
+++ b/src/bin/proxmox_backup_debug/diff.rs
@@ -293,7 +293,7 @@ async fn create_backup_reader(
     };
     let client = connect(&params.repo)?;
     let backup_reader = BackupReader::start(
-        client,
+        &client,
         params.crypt_config.clone(),
         params.repo.store(),
         &params.namespace,
-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
                   ` (3 preceding siblings ...)
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting Hannes Laimer
@ 2023-02-23 12:55 ` Hannes Laimer
  2023-02-28 11:25   ` Wolfgang Bumiller
  2023-02-28 11:36   ` Fabian Grünbichler
  2023-02-28 11:35 ` [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Fabian Grünbichler
  5 siblings, 2 replies; 12+ messages in thread
From: Hannes Laimer @ 2023-02-23 12:55 UTC (permalink / raw)
  To: pbs-devel

... and rewrite pull logic.

Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
---
 pbs-api-types/src/datastore.rs  |    2 +-
 pbs-datastore/src/read_chunk.rs |    2 +-
 src/api2/pull.rs                |   13 +-
 src/server/pull.rs              | 1023 +++++++++++++++++++------------
 4 files changed, 648 insertions(+), 392 deletions(-)

diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
index 72e8d1ee..9a692b08 100644
--- a/pbs-api-types/src/datastore.rs
+++ b/pbs-api-types/src/datastore.rs
@@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
 /// Uniquely identify a Backup (relative to data store)
 ///
 /// We also call this a backup snaphost.
-#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
+#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
 #[serde(rename_all = "kebab-case")]
 pub struct BackupDir {
     /// Backup group.
diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
index c04a7431..29ee2d4c 100644
--- a/pbs-datastore/src/read_chunk.rs
+++ b/pbs-datastore/src/read_chunk.rs
@@ -14,7 +14,7 @@ pub trait ReadChunk {
     fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
 }
 
-pub trait AsyncReadChunk: Send {
+pub trait AsyncReadChunk: Send + Sync {
     /// Returns the encoded chunk data
     fn read_raw_chunk<'a>(
         &'a self,
diff --git a/src/api2/pull.rs b/src/api2/pull.rs
index bb8f6fe1..2966190c 100644
--- a/src/api2/pull.rs
+++ b/src/api2/pull.rs
@@ -121,8 +121,8 @@ pub fn do_sync_job(
             let sync_job2 = sync_job.clone();
 
             let worker_future = async move {
-                let pull_params = PullParameters::try_from(&sync_job)?;
-                let client = pull_params.client().await?;
+                let mut pull_params = PullParameters::try_from(&sync_job)?;
+                pull_params.init_source(sync_job.limit).await?;
 
                 task_log!(worker, "Starting datastore sync job '{}'", job_id);
                 if let Some(event_str) = schedule {
@@ -137,7 +137,7 @@ pub fn do_sync_job(
                 );
 
                 if sync_job.remote.is_some() {
-                    pull_store(&worker, &client, pull_params).await?;
+                    pull_store(&worker, pull_params).await?;
                 } else {
                     match (sync_job.ns, sync_job.remote_ns) {
                         (Some(target_ns), Some(source_ns))
@@ -280,7 +280,7 @@ async fn pull(
         delete,
     )?;
 
-    let pull_params = PullParameters::new(
+    let mut pull_params = PullParameters::new(
         &store,
         ns,
         remote.as_deref(),
@@ -290,9 +290,8 @@ async fn pull(
         remove_vanished,
         max_depth,
         group_filter,
-        limit,
     )?;
-    let client = pull_params.client().await?;
+    pull_params.init_source(limit).await?;
 
     // fixme: set to_stdout to false?
     // FIXME: add namespace to worker id?
@@ -310,7 +309,7 @@ async fn pull(
                 remote_store,
             );
 
-            let pull_future = pull_store(&worker, &client, pull_params);
+            let pull_future = pull_store(&worker, pull_params);
             (select! {
                 success = pull_future.fuse() => success,
                 abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
diff --git a/src/server/pull.rs b/src/server/pull.rs
index 65eedf2c..d3be39da 100644
--- a/src/server/pull.rs
+++ b/src/server/pull.rs
@@ -1,28 +1,26 @@
 //! Sync datastore from remote server
 
 use std::collections::{HashMap, HashSet};
-use std::io::{Seek, SeekFrom};
+use std::io::{Seek, SeekFrom, Write};
+use std::path::PathBuf;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::SystemTime;
 
 use anyhow::{bail, format_err, Error};
 use http::StatusCode;
-use pbs_config::CachedUserInfo;
-use serde_json::json;
-
+use proxmox_rest_server::WorkerTask;
 use proxmox_router::HttpError;
 use proxmox_sys::task_log;
+use serde_json::json;
 
 use pbs_api_types::{
-    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
+    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
     Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
     PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
 };
-
-use pbs_client::{
-    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
-};
+use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
+use pbs_config::CachedUserInfo;
 use pbs_datastore::data_blob::DataBlob;
 use pbs_datastore::dynamic_index::DynamicIndexReader;
 use pbs_datastore::fixed_index::FixedIndexReader;
@@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
 use pbs_datastore::manifest::{
     archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
 };
-use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
+use pbs_datastore::read_chunk::AsyncReadChunk;
+use pbs_datastore::{
+    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
+};
 use pbs_tools::sha::sha256;
-use proxmox_rest_server::WorkerTask;
 
-use crate::backup::{check_ns_modification_privs, check_ns_privs};
+use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
 use crate::tools::parallel_handler::ParallelHandler;
 
 /// Parameters for a pull operation.
 pub(crate) struct PullParameters {
-    /// Remote that is pulled from
-    remote: Remote,
-    /// Full specification of remote datastore
-    source: BackupRepository,
-    /// Local store that is pulled into
-    store: Arc<DataStore>,
-    /// Remote namespace
-    remote_ns: BackupNamespace,
-    /// Local namespace (anchor)
-    ns: BackupNamespace,
+    /// Where data is pulled from
+    source: PullSource,
+    /// Where data should be pulled into
+    target: PullTarget,
     /// Owner of synced groups (needs to match local owner of pre-existing groups)
     owner: Authid,
     /// Whether to remove groups which exist locally, but not on the remote end
@@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
     max_depth: Option<usize>,
     /// Filters for reducing the pull scope
     group_filter: Option<Vec<GroupFilter>>,
-    /// Rate limits for all transfers from `remote`
-    limit: RateLimitConfig,
+}
+
+pub(crate) enum PullSource {
+    Remote(RemoteSource),
+    Local(LocalSource),
+}
+
+pub(crate) struct PullTarget {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
+pub(crate) struct LocalSource {
+    store: Arc<DataStore>,
+    ns: BackupNamespace,
+}
+
+pub(crate) struct RemoteSource {
+    remote: Remote,
+    repo: BackupRepository,
+    ns: BackupNamespace,
+    client: Option<HttpClient>,
+    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,
+}
+
+impl PullSource {
+    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+        match self {
+            PullSource::Remote(source) => {
+                source.client.replace(
+                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
+                );
+            }
+            PullSource::Local(_) => {}
+        };
+        Ok(())
+    }
+
+    async fn list_namespaces(
+        &self,
+        max_depth: &mut Option<usize>,
+        worker: &WorkerTask,
+    ) -> Result<Vec<BackupNamespace>, Error> {
+        match &self {
+            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,
+            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
+                source.store.clone(),
+                source.ns.clone(),
+                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
+            )?
+            .collect(),
+        }
+    }
+
+    async fn list_groups(
+        &self,
+        namespace: &BackupNamespace,
+        owner: &Authid,
+    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());
+
+                let args = if !namespace.is_root() {
+                    Some(json!({ "ns": namespace.clone() }))
+                } else {
+                    None
+                };
+
+                let client = source.get_client()?;
+                client.login().await?;
+                let mut result = client.get(&path, args).await.map_err(|err| {
+                    format_err!("Failed to retrieve backup groups from remote - {}", err)
+                })?;
+
+                Ok(
+                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
+                        .map_err(|e| Error::from(e))?
+                        .into_iter()
+                        .map(|item| item.backup)
+                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
+                )
+            }
+            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
+                &source.store,
+                namespace.clone(),
+                MAX_NAMESPACE_DEPTH,
+                None,
+                None,
+                Some(owner),
+            )?
+            .filter_map(Result::ok)
+            .map(|backup_group| backup_group.group().clone())
+            .collect::<Vec<pbs_api_types::BackupGroup>>()),
+        }
+    }
+
+    async fn list_backup_dirs(
+        &self,
+        namespace: &BackupNamespace,
+        group: &pbs_api_types::BackupGroup,
+        worker: &WorkerTask,
+    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let path = format!(
+                    "api2/json/admin/datastore/{}/snapshots",
+                    source.repo.store()
+                );
+
+                let mut args = json!({
+                    "backup-type": group.ty,
+                    "backup-id": group.id,
+                });
+
+                if !source.ns.is_root() {
+                    args["ns"] = serde_json::to_value(&source.ns)?;
+                }
+
+                let client = source.get_client()?;
+                client.login().await?;
+
+                let mut result = client.get(&path, Some(args)).await?;
+                let snapshot_list: Vec<SnapshotListItem> =
+                    serde_json::from_value(result["data"].take())?;
+                Ok(snapshot_list
+                    .into_iter()
+                    .filter_map(|item: SnapshotListItem| {
+                        let snapshot = item.backup;
+                        // in-progress backups can't be synced
+                        if item.size.is_none() {
+                            task_log!(
+                                worker,
+                                "skipping snapshot {} - in-progress backup",
+                                snapshot
+                            );
+                            return None;
+                        }
+
+                        Some(snapshot)
+                    })
+                    .collect::<Vec<BackupDir>>())
+            }
+            PullSource::Local(source) => Ok(source
+                .store
+                .backup_group(namespace.clone(), group.clone())
+                .iter_snapshots()?
+                .filter_map(Result::ok)
+                .map(|snapshot| snapshot.dir().to_owned())
+                .collect::<Vec<BackupDir>>()),
+        }
+    }
+
+    /// Load file from source namespace and BackupDir into file
+    async fn load_file_into(
+        &mut self,
+        namespace: &BackupNamespace,
+        snapshot: &pbs_api_types::BackupDir,
+        filename: &str,
+        into: &PathBuf,
+        worker: &WorkerTask,
+    ) -> Result<Option<DataBlob>, Error> {
+        let mut tmp_file = std::fs::OpenOptions::new()
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .read(true)
+            .open(into)?;
+        match self {
+            PullSource::Remote(ref mut source) => {
+                let client = source.get_client()?;
+                client.login().await?;
+
+                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
+                    reader.clone()
+                } else {
+                    let backup_reader = BackupReader::start(
+                        client,
+                        None,
+                        source.repo.store(),
+                        namespace,
+                        snapshot,
+                        true,
+                    )
+                    .await?;
+                    source
+                        .backup_reader
+                        .insert(snapshot.clone(), backup_reader.clone());
+                    backup_reader
+                };
+
+                let download_result = reader.download(filename, &mut tmp_file).await;
+
+                if let Err(err) = download_result {
+                    match err.downcast_ref::<HttpError>() {
+                        Some(HttpError { code, message }) => match *code {
+                            StatusCode::NOT_FOUND => {
+                                task_log!(
+                                    worker,
+                                    "skipping snapshot {} - vanished since start of sync",
+                                    snapshot,
+                                );
+                                return Ok(None);
+                            }
+                            _ => {
+                                bail!("HTTP error {code} - {message}");
+                            }
+                        },
+                        None => {
+                            return Err(err);
+                        }
+                    };
+                };
+            }
+            PullSource::Local(source) => {
+                let dir = source
+                    .store
+                    .backup_dir(namespace.clone(), snapshot.clone())?;
+                let mut from_path = dir.full_path();
+                from_path.push(filename);
+                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
+            }
+        }
+
+        tmp_file.seek(SeekFrom::Start(0))?;
+        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())
+    }
+
+    // Note: The client.log.blob is uploaded after the backup, so it is
+    // not mentioned in the manifest.
+    async fn try_download_client_log(
+        &self,
+        from_snapshot: &pbs_api_types::BackupDir,
+        to_path: &std::path::Path,
+        worker: &WorkerTask,
+    ) -> Result<(), Error> {
+        match &self {
+            PullSource::Remote(source) => {
+                let reader = source
+                    .backup_reader
+                    .get(from_snapshot)
+                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
+                let mut tmp_path = to_path.to_owned();
+                tmp_path.set_extension("tmp");
+
+                let tmpfile = std::fs::OpenOptions::new()
+                    .write(true)
+                    .create(true)
+                    .read(true)
+                    .open(&tmp_path)?;
+
+                // Note: be silent if there is no log - only log successful download
+                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
+                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
+                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
+                    }
+                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
+                }
+
+                Ok(())
+            }
+            PullSource::Local(_) => Ok(()),
+        }
+    }
+
+    fn get_chunk_reader(
+        &self,
+        snapshot: &pbs_api_types::BackupDir,
+        crypt_mode: CryptMode,
+    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
+        Ok(match &self {
+            PullSource::Remote(source) => {
+                if let Some(reader) = source.backup_reader.get(snapshot) {
+                    Arc::new(RemoteChunkReader::new(
+                        reader.clone(),
+                        None,
+                        crypt_mode,
+                        HashMap::new(),
+                    ))
+                } else {
+                    bail!("No initialized BackupReader!")
+                }
+            }
+            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
+                source.store.clone(),
+                None,
+                crypt_mode,
+            )),
+        })
+    }
+
+    fn get_ns(&self) -> BackupNamespace {
+        match &self {
+            PullSource::Remote(source) => source.ns.clone(),
+            PullSource::Local(source) => source.ns.clone(),
+        }
+    }
+
+    fn print_store_and_ns(&self) -> String {
+        match &self {
+            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
+            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
+        }
+    }
+}
+
+impl RemoteSource {
+    fn get_client(&self) -> Result<&HttpClient, Error> {
+        if let Some(client) = &self.client {
+            Ok(client)
+        } else {
+            bail!("RemoteSource not initialized")
+        }
+    }
 }
 
 impl PullParameters {
     /// Creates a new instance of `PullParameters`.
-    ///
-    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
-    /// [BackupRepository] with `remote_store`.
-    #[allow(clippy::too_many_arguments)]
     pub(crate) fn new(
         store: &str,
         ns: BackupNamespace,
-        remote: &str,
+        remote: Option<&str>,
         remote_store: &str,
         remote_ns: BackupNamespace,
         owner: Authid,
         remove_vanished: Option<bool>,
         max_depth: Option<usize>,
         group_filter: Option<Vec<GroupFilter>>,
-        limit: RateLimitConfig,
     ) -> Result<Self, Error> {
-        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
-
         if let Some(max_depth) = max_depth {
             ns.check_max_depth(max_depth)?;
             remote_ns.check_max_depth(max_depth)?;
-        }
+        };
+        let remove_vanished = remove_vanished.unwrap_or(false);
 
-        let (remote_config, _digest) = pbs_config::remote::config()?;
-        let remote: Remote = remote_config.lookup("remote", remote)?;
+        let source: PullSource = if let Some(remote) = remote {
+            let (remote_config, _digest) = pbs_config::remote::config()?;
+            let remote: Remote = remote_config.lookup("remote", remote)?;
 
-        let remove_vanished = remove_vanished.unwrap_or(false);
+            let repo = BackupRepository::new(
+                Some(remote.config.auth_id.clone()),
+                Some(remote.config.host.clone()),
+                remote.config.port,
+                remote_store.to_string(),
+            );
 
-        let source = BackupRepository::new(
-            Some(remote.config.auth_id.clone()),
-            Some(remote.config.host.clone()),
-            remote.config.port,
-            remote_store.to_string(),
-        );
+            PullSource::Remote(RemoteSource {
+                remote,
+                repo,
+                ns: remote_ns.clone(),
+                client: None,
+                backup_reader: HashMap::new(),
+            })
+        } else {
+            PullSource::Local(LocalSource {
+                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
+                ns: remote_ns,
+            })
+        };
+        let target = PullTarget {
+            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
+            ns,
+        };
 
         Ok(Self {
-            remote,
-            remote_ns,
-            ns,
             source,
-            store,
+            target,
             owner,
             remove_vanished,
             max_depth,
             group_filter,
-            limit,
         })
     }
 
-    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
-    pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
+    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
+        self.source.init(limit).await
+    }
+
+    pub(crate) fn skip_chunk_sync(&self) -> bool {
+        match &self.source {
+            PullSource::Local(source) => source.store.name() == self.target.store.name(),
+            PullSource::Remote(_) => false,
+        }
+    }
+
+    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
+        let source_ns = self.source.get_ns();
+        source_ns.map_prefix(&source_ns, &self.target.ns)
     }
 }
 
+async fn list_remote_namespaces(
+    source: &RemoteSource,
+    max_depth: &mut Option<usize>,
+    worker: &WorkerTask,
+) -> Result<Vec<BackupNamespace>, Error> {
+    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
+        vec![source.ns.clone()];
+    }
+
+    let path = format!(
+        "api2/json/admin/datastore/{}/namespace",
+        source.repo.store()
+    );
+    let mut data = json!({});
+    if let Some(max_depth) = max_depth {
+        data["max-depth"] = json!(max_depth);
+    }
+
+    if !source.ns.is_root() {
+        data["parent"] = json!(source.ns);
+    }
+
+    let client = source.get_client()?;
+    client.login().await?;
+
+    let mut result = match client.get(&path, Some(data)).await {
+        Ok(res) => res,
+        Err(err) => match err.downcast_ref::<HttpError>() {
+            Some(HttpError { code, message }) => match code {
+                &StatusCode::NOT_FOUND => {
+                    if source.ns.is_root() && max_depth.is_none() {
+                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
+                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
+                        max_depth.replace(0);
+                    } else {
+                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
+                    }
+
+                    return Ok(vec![source.ns.clone()]);
+                }
+                _ => {
+                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
+                }
+            },
+            None => {
+                bail!("Querying namespaces failed - {err}");
+            }
+        },
+    };
+
+    let list: Vec<pbs_api_types::BackupNamespace> =
+        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
+            .iter()
+            .map(|list_item| list_item.ns.clone())
+            .collect();
+
+    Ok(list)
+}
+
 async fn pull_index_chunks<I: IndexFile>(
     worker: &WorkerTask,
-    chunk_reader: RemoteChunkReader,
+    chunk_reader: Arc<dyn AsyncReadChunk>,
     target: Arc<DataStore>,
     index: I,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
@@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
     Ok(())
 }
 
-async fn download_manifest(
-    reader: &BackupReader,
-    filename: &std::path::Path,
-) -> Result<std::fs::File, Error> {
-    let mut tmp_manifest_file = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .read(true)
-        .open(filename)?;
-
-    reader
-        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
-        .await?;
-
-    tmp_manifest_file.seek(SeekFrom::Start(0))?;
-
-    Ok(tmp_manifest_file)
-}
-
 fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
     if size != info.size {
         bail!(
@@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
 /// Pulls a single file referenced by a manifest.
 ///
 /// Pulling an archive consists of the following steps:
-/// - Create tmp file for archive
-/// - Download archive file into tmp file
+/// - Load archive file into tmp file
 /// - Verify tmp file checksum
 /// - if archive is an index, pull referenced chunks
 /// - Rename tmp file into real path
 async fn pull_single_archive(
     worker: &WorkerTask,
-    reader: &BackupReader,
-    chunk_reader: &mut RemoteChunkReader,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    from_namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     archive_info: &FileInfo,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
     let archive_name = &archive_info.filename;
-    let mut path = snapshot.full_path();
+    let mut path = to_snapshot.full_path();
     path.push(archive_name);
 
     let mut tmp_path = path.clone();
@@ -273,13 +636,18 @@ async fn pull_single_archive(
 
     task_log!(worker, "sync archive {}", archive_name);
 
-    let mut tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
+    params
+        .source
+        .load_file_into(
+            from_namespace,
+            from_snapshot,
+            archive_name,
+            &tmp_path,
+            worker,
+        )
+        .await?;
 
-    reader.download(archive_name, &mut tmpfile).await?;
+    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
 
     match archive_type(archive_name)? {
         ArchiveType::DynamicIndex => {
@@ -289,14 +657,20 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if params.skip_chunk_sync() {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    params
+                        .source
+                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+                    params.target.store.clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::FixedIndex => {
             let index = FixedIndexReader::new(tmpfile).map_err(|err| {
@@ -305,14 +679,20 @@ async fn pull_single_archive(
             let (csum, size) = index.compute_csum();
             verify_archive(archive_info, &csum, size)?;
 
-            pull_index_chunks(
-                worker,
-                chunk_reader.clone(),
-                snapshot.datastore().clone(),
-                index,
-                downloaded_chunks,
-            )
-            .await?;
+            if params.skip_chunk_sync() {
+                task_log!(worker, "skipping chunk sync for same datatsore");
+            } else {
+                pull_index_chunks(
+                    worker,
+                    params
+                        .source
+                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
+                    params.target.store.clone(),
+                    index,
+                    downloaded_chunks,
+                )
+                .await?;
+            }
         }
         ArchiveType::Blob => {
             tmpfile.seek(SeekFrom::Start(0))?;
@@ -326,33 +706,6 @@ async fn pull_single_archive(
     Ok(())
 }
 
-// Note: The client.log.blob is uploaded after the backup, so it is
-// not mentioned in the manifest.
-async fn try_client_log_download(
-    worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    path: &std::path::Path,
-) -> Result<(), Error> {
-    let mut tmp_path = path.to_owned();
-    tmp_path.set_extension("tmp");
-
-    let tmpfile = std::fs::OpenOptions::new()
-        .write(true)
-        .create(true)
-        .read(true)
-        .open(&tmp_path)?;
-
-    // Note: be silent if there is no log - only log successful download
-    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
-        if let Err(err) = std::fs::rename(&tmp_path, path) {
-            bail!("Atomic rename file {:?} failed - {}", path, err);
-        }
-        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
-    }
-
-    Ok(())
-}
-
 /// Actual implementation of pulling a snapshot.
 ///
 /// Pulling a snapshot consists of the following steps:
@@ -364,44 +717,37 @@ async fn try_client_log_download(
 /// - Download log if not already existing
 async fn pull_snapshot(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let mut manifest_name = snapshot.full_path();
+    let mut manifest_name = to_snapshot.full_path();
     manifest_name.push(MANIFEST_BLOB_NAME);
 
-    let mut client_log_name = snapshot.full_path();
+    let mut client_log_name = to_snapshot.full_path();
     client_log_name.push(CLIENT_LOG_BLOB_NAME);
 
     let mut tmp_manifest_name = manifest_name.clone();
     tmp_manifest_name.set_extension("tmp");
 
-    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
-    let mut tmp_manifest_file = match download_res {
-        Ok(manifest_file) => manifest_file,
-        Err(err) => {
-            match err.downcast_ref::<HttpError>() {
-                Some(HttpError { code, message }) => match *code {
-                    StatusCode::NOT_FOUND => {
-                        task_log!(
-                            worker,
-                            "skipping snapshot {} - vanished since start of sync",
-                            snapshot.dir(),
-                        );
-                        return Ok(());
-                    }
-                    _ => {
-                        bail!("HTTP error {code} - {message}");
-                    }
-                },
-                None => {
-                    return Err(err);
-                }
-            };
-        }
-    };
-    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
+    let tmp_manifest_blob;
+    if let Some(data) = params
+        .source
+        .load_file_into(
+            namespace,
+            from_snapshot,
+            MANIFEST_BLOB_NAME,
+            &tmp_manifest_name,
+            worker,
+        )
+        .await?
+    {
+        tmp_manifest_blob = data;
+    } else {
+        return Ok(());
+    }
 
     if manifest_name.exists() {
         let manifest_blob = proxmox_lang::try_block!({
@@ -418,8 +764,11 @@ async fn pull_snapshot(
 
         if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
             if !client_log_name.exists() {
-                try_client_log_download(worker, reader, &client_log_name).await?;
-            }
+                params
+                    .source
+                    .try_download_client_log(from_snapshot, &client_log_name, worker)
+                    .await?;
+            };
             task_log!(worker, "no data changes");
             let _ = std::fs::remove_file(&tmp_manifest_name);
             return Ok(()); // nothing changed
@@ -429,7 +778,7 @@ async fn pull_snapshot(
     let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
 
     for item in manifest.files() {
-        let mut path = snapshot.full_path();
+        let mut path = to_snapshot.full_path();
         path.push(&item.filename);
 
         if path.exists() {
@@ -467,18 +816,12 @@ async fn pull_snapshot(
             }
         }
 
-        let mut chunk_reader = RemoteChunkReader::new(
-            reader.clone(),
-            None,
-            item.chunk_crypt_mode(),
-            HashMap::new(),
-        );
-
         pull_single_archive(
             worker,
-            &reader,
-            &mut chunk_reader,
-            snapshot,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
             item,
             downloaded_chunks.clone(),
         )
@@ -490,10 +833,12 @@ async fn pull_snapshot(
     }
 
     if !client_log_name.exists() {
-        try_client_log_download(worker, reader, &client_log_name).await?;
-    }
-
-    snapshot
+        params
+            .source
+            .try_download_client_log(from_snapshot, &client_log_name, worker)
+            .await?;
+    };
+    to_snapshot
         .cleanup_unreferenced_files(&manifest)
         .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
 
@@ -501,37 +846,53 @@ async fn pull_snapshot(
 }
 
 /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
-///
-/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
-/// pointing to the local datastore and target namespace.
 async fn pull_snapshot_from(
     worker: &WorkerTask,
-    reader: Arc<BackupReader>,
-    snapshot: &pbs_datastore::BackupDir,
+    params: &mut PullParameters,
+    namespace: &BackupNamespace,
+    from_snapshot: &pbs_api_types::BackupDir,
+    to_snapshot: &pbs_datastore::BackupDir,
     downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
 ) -> Result<(), Error> {
-    let (_path, is_new, _snap_lock) = snapshot
+    let (_path, is_new, _snap_lock) = to_snapshot
         .datastore()
-        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
+        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
 
     if is_new {
-        task_log!(worker, "sync snapshot {}", snapshot.dir());
+        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
 
-        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
-            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
-                snapshot.backup_ns(),
-                snapshot.as_ref(),
+        if let Err(err) = pull_snapshot(
+            worker,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
+            downloaded_chunks,
+        )
+        .await
+        {
+            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
+                to_snapshot.backup_ns(),
+                to_snapshot.as_ref(),
                 true,
             ) {
                 task_log!(worker, "cleanup error - {}", cleanup_err);
             }
             return Err(err);
         }
-        task_log!(worker, "sync snapshot {} done", snapshot.dir());
+        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
     } else {
-        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
-        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
-        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
+        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
+        pull_snapshot(
+            worker,
+            params,
+            namespace,
+            from_snapshot,
+            to_snapshot,
+            downloaded_chunks,
+        )
+        .await?;
+        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
     }
 
     Ok(())
@@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
 /// - Sort by snapshot time
 /// - Get last snapshot timestamp on local datastore
 /// - Iterate over list of snapshots
-/// -- Recreate client/BackupReader
 /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
 /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
 ///
@@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
 /// - local group owner is already checked by pull_store
 async fn pull_group(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
+    params: &mut PullParameters,
+    source_namespace: &BackupNamespace,
     group: &pbs_api_types::BackupGroup,
-    remote_ns: BackupNamespace,
     progress: &mut StoreProgress,
 ) -> Result<(), Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/snapshots",
-        params.source.store()
-    );
-
-    let mut args = json!({
-        "backup-type": group.ty,
-        "backup-id": group.id,
-    });
-
-    if !remote_ns.is_root() {
-        args["ns"] = serde_json::to_value(&remote_ns)?;
-    }
-
-    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;
-
-    let mut result = client.get(&path, Some(args)).await?;
-    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
-
-    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
-
-    client.login().await?; // make sure auth is complete
-
-    let fingerprint = client.fingerprint();
-
-    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
-
-    let mut remote_snapshots = std::collections::HashSet::new();
-
-    // start with 65536 chunks (up to 256 GiB)
-    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
-
-    progress.group_snapshots = list.len() as u64;
+    let target_ns = params.get_target_ns()?;
 
+    let mut source_snapshots = HashSet::new();
+    let last_sync = params
+        .target
+        .store
+        .last_successful_backup(&target_ns, group)?;
     let mut skip_info = SkipInfo {
         oldest: i64::MAX,
         newest: i64::MIN,
         count: 0,
     };
 
-    for (pos, item) in list.into_iter().enumerate() {
-        let snapshot = item.backup;
-
-        // in-progress backups can't be synced
-        if item.size.is_none() {
-            task_log!(
-                worker,
-                "skipping snapshot {} - in-progress backup",
-                snapshot
-            );
-            continue;
-        }
+    let mut list: Vec<BackupDir> = params
+        .source
+        .list_backup_dirs(source_namespace, group, worker)
+        .await?
+        .into_iter()
+        .filter(|dir| {
+            source_snapshots.insert(dir.time);
+            if let Some(last_sync_time) = last_sync {
+                if last_sync_time > dir.time {
+                    skip_info.update(dir.time);
+                    return false;
+                }
+            }
+            true
+        })
+        .collect();
 
-        remote_snapshots.insert(snapshot.time);
+    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
 
-        if let Some(last_sync_time) = last_sync {
-            if last_sync_time > snapshot.time {
-                skip_info.update(snapshot.time);
-                continue;
-            }
-        }
+    // start with 65536 chunks (up to 256 GiB)
+    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
 
-        // get updated auth_info (new tickets)
-        let auth_info = client.login().await?;
-
-        let options =
-            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
-                .rate_limit(params.limit.clone());
-
-        let new_client = HttpClient::new(
-            params.source.host(),
-            params.source.port(),
-            params.source.auth_id(),
-            options,
-        )?;
-
-        let reader = BackupReader::start(
-            new_client,
-            None,
-            params.source.store(),
-            &remote_ns,
-            &snapshot,
-            true,
-        )
-        .await?;
+    progress.group_snapshots = list.len() as u64;
 
-        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
+    for (pos, from_snapshot) in list.into_iter().enumerate() {
+        let to_snapshot = params
+            .target
+            .store
+            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
 
-        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
+        let result = pull_snapshot_from(
+            worker,
+            params,
+            source_namespace,
+            &from_snapshot,
+            &to_snapshot,
+            downloaded_chunks.clone(),
+        )
+        .await;
 
         progress.done_snapshots = pos as u64 + 1;
         task_log!(worker, "percentage done: {}", progress);
@@ -703,11 +1025,14 @@ async fn pull_group(
     }
 
     if params.remove_vanished {
-        let group = params.store.backup_group(target_ns.clone(), group.clone());
+        let group = params
+            .target
+            .store
+            .backup_group(target_ns.clone(), group.clone());
         let local_list = group.list_backups()?;
         for info in local_list {
             let snapshot = info.backup_dir;
-            if remote_snapshots.contains(&snapshot.backup_time()) {
+            if source_snapshots.contains(&snapshot.backup_time()) {
                 continue;
             }
             if snapshot.is_protected() {
@@ -720,6 +1045,7 @@ async fn pull_group(
             }
             task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
             params
+                .target
                 .store
                 .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
         }
@@ -732,64 +1058,12 @@ async fn pull_group(
     Ok(())
 }
 
-// will modify params if switching to backwards mode for lack of NS support on remote end
-async fn query_namespaces(
-    worker: &WorkerTask,
-    client: &HttpClient,
-    params: &mut PullParameters,
-) -> Result<Vec<BackupNamespace>, Error> {
-    let path = format!(
-        "api2/json/admin/datastore/{}/namespace",
-        params.source.store()
-    );
-    let mut data = json!({});
-    if let Some(max_depth) = params.max_depth {
-        data["max-depth"] = json!(max_depth);
-    }
-
-    if !params.remote_ns.is_root() {
-        data["parent"] = json!(params.remote_ns);
-    }
-
-    let mut result = match client.get(&path, Some(data)).await {
-        Ok(res) => res,
-        Err(err) => match err.downcast_ref::<HttpError>() {
-            Some(HttpError { code, message }) => match *code {
-                StatusCode::NOT_FOUND => {
-                    if params.remote_ns.is_root() && params.max_depth.is_none() {
-                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
-                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
-                        params.max_depth = Some(0);
-                    } else {
-                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
-                    }
-
-                    return Ok(vec![params.remote_ns.clone()]);
-                }
-                _ => {
-                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
-                }
-            },
-            None => {
-                bail!("Querying namespaces failed - {err}");
-            }
-        },
-    };
-
-    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
-
-    // parents first
-    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
-
-    Ok(list.iter().map(|item| item.ns.clone()).collect())
-}
-
 fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
     let mut created = false;
-    let store_ns_str = print_store_and_ns(params.store.name(), ns);
+    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
 
-    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
-        check_ns_modification_privs(params.store.name(), ns, &params.owner)
+    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
+        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
             .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
 
         let name = match ns.components().last() {
@@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
             }
         };
 
-        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
+        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
             bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
         }
         created = true;
     }
 
     check_ns_privs(
-        params.store.name(),
+        params.target.store.name(),
         ns,
         &params.owner,
         PRIV_DATASTORE_BACKUP,
@@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
 }
 
 fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
-    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
+    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
         .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
 
-    params.store.remove_namespace_recursive(local_ns, true)
+    params
+        .target
+        .store
+        .remove_namespace_recursive(local_ns, true)
 }
 
 fn check_and_remove_vanished_ns(
@@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
     // clamp like remote does so that we don't list more than we can ever have synced.
     let max_depth = params
         .max_depth
-        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
+        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
 
     let mut local_ns_list: Vec<BackupNamespace> = params
+        .target
         .store
-        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
+        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
         .filter(|ns| {
             let user_privs =
-                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
+                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
             user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
         })
         .collect();
@@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
     local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
 
     for local_ns in local_ns_list {
-        if local_ns == params.ns {
+        if local_ns == params.target.ns {
             continue;
         }
 
@@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
 /// - access to sub-NS checked here
 pub(crate) async fn pull_store(
     worker: &WorkerTask,
-    client: &HttpClient,
     mut params: PullParameters,
 ) -> Result<(), Error> {
     // explicit create shared lock to prevent GC on newly created chunks
-    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
+    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
     let mut errors = false;
 
     let old_max_depth = params.max_depth;
-    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
-        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
-    } else {
-        query_namespaces(worker, client, &mut params).await?
-    };
+    let mut namespaces = params
+        .source
+        .list_namespaces(&mut params.max_depth, worker)
+        .await?;
     errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
+    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
 
     let (mut groups, mut snapshots) = (0, 0);
     let mut synced_ns = HashSet::with_capacity(namespaces.len());
 
     for namespace in namespaces {
-        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
+        let source_store_ns_str = params.source.print_store_and_ns();
 
-        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
-        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
+        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
+        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
 
         task_log!(worker, "----");
         task_log!(
@@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
             }
         }
 
-        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
+        match pull_ns(worker, &namespace, &mut params).await {
             Ok((ns_progress, ns_errors)) => {
                 errors |= ns_errors;
 
@@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
                 task_log!(
                     worker,
                     "Encountered errors while syncing namespace {} - {}",
-                    namespace,
+                    &namespace,
                     err,
                 );
             }
@@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
 /// - owner check for vanished groups done here
 pub(crate) async fn pull_ns(
     worker: &WorkerTask,
-    client: &HttpClient,
-    params: &PullParameters,
-    source_ns: BackupNamespace,
-    target_ns: BackupNamespace,
+    namespace: &BackupNamespace,
+    params: &mut PullParameters,
 ) -> Result<(StoreProgress, bool), Error> {
-    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
-
-    let args = if !source_ns.is_root() {
-        Some(json!({
-            "ns": source_ns,
-        }))
-    } else {
-        None
-    };
-
-    let mut result = client
-        .get(&path, args)
-        .await
-        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
-
-    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
+    let mut list: Vec<pbs_api_types::BackupGroup> =
+        params.source.list_groups(namespace, &params.owner).await?;
 
     let total_count = list.len();
     list.sort_unstable_by(|a, b| {
-        let type_order = a.backup.ty.cmp(&b.backup.ty);
+        let type_order = a.ty.cmp(&b.ty);
         if type_order == std::cmp::Ordering::Equal {
-            a.backup.id.cmp(&b.backup.id)
+            a.id.cmp(&b.id)
         } else {
             type_order
         }
@@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
         filters.iter().any(|filter| group.matches(filter))
     };
 
-    // Get groups with target NS set
-    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
-
     let list = if let Some(ref group_filter) = &params.group_filter {
         let unfiltered_count = list.len();
         let list: Vec<pbs_api_types::BackupGroup> = list
@@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
 
     let mut progress = StoreProgress::new(list.len() as u64);
 
+    let target_ns = params.get_target_ns()?;
     for (done, group) in list.into_iter().enumerate() {
         progress.done_groups = done as u64;
         progress.done_snapshots = 0;
@@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
 
         let (owner, _lock_guard) =
             match params
+                .target
                 .store
                 .create_locked_backup_group(&target_ns, &group, &params.owner)
             {
@@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
                         err
                     );
                     errors = true; // do not stop here, instead continue
+                    task_log!(worker, "create_locked_backup_group failed");
                     continue;
                 }
             };
@@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
                 owner
             );
             errors = true; // do not stop here, instead continue
-        } else if let Err(err) = pull_group(
-            worker,
-            client,
-            params,
-            &group,
-            source_ns.clone(),
-            &mut progress,
-        )
-        .await
+        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
         {
             task_log!(worker, "sync group {} failed - {}", &group, err,);
             errors = true; // do not stop here, instead continue
@@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
 
     if params.remove_vanished {
         let result: Result<(), Error> = proxmox_lang::try_block!({
-            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
+            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
                 let local_group = local_group?;
                 let local_group = local_group.group();
                 if new_groups.contains(local_group) {
                     continue;
                 }
-                let owner = params.store.get_owner(&target_ns, local_group)?;
+                let owner = params.target.store.get_owner(&target_ns, local_group)?;
                 if check_backup_owner(&owner, &params.owner).is_err() {
                     continue;
                 }
@@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
                     }
                 }
                 task_log!(worker, "delete vanished group '{local_group}'",);
-                match params.store.remove_backup_group(&target_ns, local_group) {
+                match params
+                    .target
+                    .store
+                    .remove_backup_group(&target_ns, local_group)
+                {
                     Ok(true) => {}
                     Ok(false) => {
                         task_log!(
-- 
2.30.2





^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
@ 2023-02-28  9:41   ` Wolfgang Bumiller
  2023-02-28 11:35   ` Fabian Grünbichler
  1 sibling, 0 replies; 12+ messages in thread
From: Wolfgang Bumiller @ 2023-02-28  9:41 UTC (permalink / raw)
  To: Hannes Laimer; +Cc: pbs-devel

Just minor things.

On Thu, Feb 23, 2023 at 01:55:36PM +0100, Hannes Laimer wrote:
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/jobs.rs         |  4 +-
>  src/api2/config/remote.rs         |  2 +-
>  src/api2/config/sync.rs           | 41 +++++++++++++------
>  src/api2/node/tasks.rs            |  4 +-
>  src/api2/pull.rs                  | 68 +++++++++++++++++++++++--------
>  src/server/email_notifications.rs | 16 ++++----
>  6 files changed, 93 insertions(+), 42 deletions(-)
> 
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index b2473ec8..bb8f6fe1 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -64,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>          PullParameters::new(
>              &sync_job.store,
>              sync_job.ns.clone().unwrap_or_default(),
> -            &sync_job.remote,
> +            sync_job.remote.clone().as_deref(),

^ unnecessary .clone()

>              &sync_job.remote_store,
>              sync_job.remote_ns.clone().unwrap_or_default(),
>              sync_job
> @@ -75,7 +86,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>              sync_job.remove_vanished,
>              sync_job.max_depth,
>              sync_job.group_filter.clone(),
> -            sync_job.limit.clone(),
>          )
>      }
>  }
> diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
> index b3298cf9..31a46b0f 100644
> --- a/src/server/email_notifications.rs
> +++ b/src/server/email_notifications.rs
> @@ -486,15 +486,15 @@ pub fn send_sync_status(
>          }
>      };
>  

You can declare a binding here to:
    let tmp_src_string;
> +    let source_str = if let Some(remote) = job.remote.clone() {

You can borrow here instead of cloning.

> +        format!("Sync remote '{}'", remote)

You can assign `tmp_src_string` here, and return &tmp_src_string from
the block.  (and remote can go into the `{}` portion)
        let tmp_src_string = format!("Sync remote '{remote}'");
        &tmp_src_string

> +    } else {
> +        format!("Sync local")

And drop the `format!()` here altogether and just use "Sync local".

With the `tmp_src_string` being outside the `if` scope this will allow
the string w/o a remote to just reference the `&'static str` instead of
allocating.

> +    };
> +
>      let subject = match result {
> -        Ok(()) => format!(
> -            "Sync remote '{}' datastore '{}' successful",
> -            job.remote, job.remote_store,
> -        ),
> -        Err(_) => format!(
> -            "Sync remote '{}' datastore '{}' failed",
> -            job.remote, job.remote_store,
> -        ),
> +        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
> +        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
>      };
>  
>      send_job_status_mail(email, &subject, &text)?;
> -- 
> 2.30.2




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling Hannes Laimer
@ 2023-02-28 11:25   ` Wolfgang Bumiller
  2023-02-28 11:36   ` Fabian Grünbichler
  1 sibling, 0 replies; 12+ messages in thread
From: Wolfgang Bumiller @ 2023-02-28 11:25 UTC (permalink / raw)
  To: Hannes Laimer; +Cc: pbs-devel

On Thu, Feb 23, 2023 at 01:55:40PM +0100, Hannes Laimer wrote:
> ... and rewrite pull logic.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/datastore.rs  |    2 +-
>  pbs-datastore/src/read_chunk.rs |    2 +-
>  src/api2/pull.rs                |   13 +-
>  src/server/pull.rs              | 1023 +++++++++++++++++++------------
>  4 files changed, 648 insertions(+), 392 deletions(-)
> 
> diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
> index 72e8d1ee..9a692b08 100644
> --- a/pbs-api-types/src/datastore.rs
> +++ b/pbs-api-types/src/datastore.rs
> @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
>  /// Uniquely identify a Backup (relative to data store)
>  ///
>  /// We also call this a backup snaphost.
> -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
> +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
>  #[serde(rename_all = "kebab-case")]
>  pub struct BackupDir {
>      /// Backup group.
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>      fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>  }
>  
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {
>      /// Returns the encoded chunk data
>      fn read_raw_chunk<'a>(
>          &'a self,
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index bb8f6fe1..2966190c 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -121,8 +121,8 @@ pub fn do_sync_job(
>              let sync_job2 = sync_job.clone();
>  
>              let worker_future = async move {
> -                let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
> +                let mut pull_params = PullParameters::try_from(&sync_job)?;
> +                pull_params.init_source(sync_job.limit).await?;
>  
>                  task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                  if let Some(event_str) = schedule {
> @@ -137,7 +137,7 @@ pub fn do_sync_job(
>                  );
>  
>                  if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> +                    pull_store(&worker, pull_params).await?;
>                  } else {
>                      match (sync_job.ns, sync_job.remote_ns) {
>                          (Some(target_ns), Some(source_ns))
> @@ -280,7 +280,7 @@ async fn pull(
>          delete,
>      )?;
>  
> -    let pull_params = PullParameters::new(
> +    let mut pull_params = PullParameters::new(
>          &store,
>          ns,
>          remote.as_deref(),
> @@ -290,9 +290,8 @@ async fn pull(
>          remove_vanished,
>          max_depth,
>          group_filter,
> -        limit,
>      )?;
> -    let client = pull_params.client().await?;
> +    pull_params.init_source(limit).await?;
>  
>      // fixme: set to_stdout to false?
>      // FIXME: add namespace to worker id?
> @@ -310,7 +309,7 @@ async fn pull(
>                  remote_store,
>              );
>  
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>              (select! {
>                  success = pull_future.fuse() => success,
>                  abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..d3be39da 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
> +use std::path::PathBuf;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
>  use proxmox_sys::task_log;
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> +    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
>      Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> +use pbs_datastore::{
> +    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
> +};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
> -use crate::backup::{check_ns_modification_privs, check_ns_privs};
> +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> -    store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
> -    ns: BackupNamespace,
> +    /// Where data is pulled from
> +    source: PullSource,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
> +}
> +
> +pub(crate) enum PullSource {
> +    Remote(RemoteSource),
> +    Local(LocalSource),
> +}
> +
> +pub(crate) struct PullTarget {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct LocalSource {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    remote: Remote,
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: Option<HttpClient>,
> +    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,
> +}
> +
> +impl PullSource {
> +    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        match self {
> +            PullSource::Remote(source) => {
> +                source.client.replace(
> +                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
> +                );
> +            }
> +            PullSource::Local(_) => {}
> +        };
> +        Ok(())
> +    }
> +
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {

I'd very much prefer if all these functions that mainly `match` on
`self` factored into impl blocks for `RemoteSource` and `LocalSource`
and just forward the call.

Including this one for consistency, even if the size-wise it seems
excessive.

> +        match &self {
> +            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,
> +            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
> +                source.store.clone(),
> +                source.ns.clone(),
> +                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
> +            )?
> +            .collect(),
> +        }
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {

But here and below the indentation and code start to get bigger and
it'll just be easier to maintain these separately.
This is basically like a common API surface for "sources".

> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());

(Note for future work: we should really factor calls like this into a
`PbsClient` within the pbs-client crate. A mere "HttpClient" really
isn't something I'd look for in a `pbs-client` crate and this (and most
code in `proxmox-backup-client`) should actually just be a call to
`client.list_groups(store, namespace).await` (or equivalent) in the
future)

> +
> +                let args = if !namespace.is_root() {
> +                    Some(json!({ "ns": namespace.clone() }))
> +                } else {
> +                    None
> +                };
> +
> +                let client = source.get_client()?;
> +                client.login().await?;
> +                let mut result = client.get(&path, args).await.map_err(|err| {
> +                    format_err!("Failed to retrieve backup groups from remote - {}", err)
> +                })?;
> +
> +                Ok(
> +                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                        .map_err(|e| Error::from(e))?
> +                        .into_iter()
> +                        .map(|item| item.backup)
> +                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
> +                )
> +            }
> +            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
> +                &source.store,
> +                namespace.clone(),
> +                MAX_NAMESPACE_DEPTH,
> +                None,
> +                None,
> +                Some(owner),
> +            )?
> +            .filter_map(Result::ok)

^ Should we not log errors (or fail)?

> +            .map(|backup_group| backup_group.group().clone())
> +            .collect::<Vec<pbs_api_types::BackupGroup>>()),
> +        }
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &pbs_api_types::BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!(
> +                    "api2/json/admin/datastore/{}/snapshots",
> +                    source.repo.store()
> +                );
> +
> +                let mut args = json!({
> +                    "backup-type": group.ty,
> +                    "backup-id": group.id,
> +                });
> +
> +                if !source.ns.is_root() {
> +                    args["ns"] = serde_json::to_value(&source.ns)?;
> +                }
> +
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let mut result = client.get(&path, Some(args)).await?;
> +                let snapshot_list: Vec<SnapshotListItem> =
> +                    serde_json::from_value(result["data"].take())?;
> +                Ok(snapshot_list
> +                    .into_iter()
> +                    .filter_map(|item: SnapshotListItem| {
> +                        let snapshot = item.backup;
> +                        // in-progress backups can't be synced
> +                        if item.size.is_none() {
> +                            task_log!(
> +                                worker,
> +                                "skipping snapshot {} - in-progress backup",
> +                                snapshot
> +                            );
> +                            return None;
> +                        }
> +
> +                        Some(snapshot)
> +                    })
> +                    .collect::<Vec<BackupDir>>())
> +            }
> +            PullSource::Local(source) => Ok(source
> +                .store
> +                .backup_group(namespace.clone(), group.clone())
> +                .iter_snapshots()?
> +                .filter_map(Result::ok)

^ Should we not log errors (or fail)?

> +                .map(|snapshot| snapshot.dir().to_owned())
> +                .collect::<Vec<BackupDir>>()),
> +        }
> +    }
> +
> +    /// Load file from source namespace and BackupDir into file
> +    async fn load_file_into(
> +        &mut self,
> +        namespace: &BackupNamespace,
> +        snapshot: &pbs_api_types::BackupDir,
> +        filename: &str,
> +        into: &PathBuf,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        match self {
> +            PullSource::Remote(ref mut source) => {
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    reader.clone()
> +                } else {
> +                    let backup_reader = BackupReader::start(
> +                        client,
> +                        None,
> +                        source.repo.store(),
> +                        namespace,
> +                        snapshot,
> +                        true,
> +                    )
> +                    .await?;
> +                    source
> +                        .backup_reader
> +                        .insert(snapshot.clone(), backup_reader.clone());
> +                    backup_reader
> +                };
> +
> +                let download_result = reader.download(filename, &mut tmp_file).await;
> +
> +                if let Err(err) = download_result {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",
> +                                    snapshot,
> +                                );
> +                                return Ok(None);
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
> +                };
> +            }
> +            PullSource::Local(source) => {
> +                let dir = source
> +                    .store
> +                    .backup_dir(namespace.clone(), snapshot.clone())?;
> +                let mut from_path = dir.full_path();
> +                from_path.push(filename);
> +                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;

opening and using `std::io::copy()` might be nicer, after all, if most
of this code is about loading a file, why not just have it return a
`Vec<u8>`?
In fact, maybe that would be a nicer interface after all?
Given that we're creating unwritten tmpfiles and leaving them empty on
error, making it the caller's responsibility to clean it up, but not to
create it. (Not too bad for internal code, but IMO still nicer to avoid
this...)

(Another possibility would be to just make a hardlink here and skip the
creation on top altogether.)

> +            }
> +        }
> +
> +        tmp_file.seek(SeekFrom::Start(0))?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())

You're ignoring an error here. I suppose that's because you now use this
twice and one case doesn't actually use or care about the result from
this function and didn't call `DataBlob::load_from_reader` on the
contents either.
I think it makes more sense to return the `tmp_file` and have the caller
decide whether to use `DataBlob::load_from_reader` on it and then not
discard the error there.

> +    }
> +
> +    // Note: The client.log.blob is uploaded after the backup, so it is
> +    // not mentioned in the manifest.
> +    async fn try_download_client_log(
> +        &self,
> +        from_snapshot: &pbs_api_types::BackupDir,
> +        to_path: &std::path::Path,
> +        worker: &WorkerTask,
> +    ) -> Result<(), Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let reader = source
> +                    .backup_reader
> +                    .get(from_snapshot)
> +                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
> +                let mut tmp_path = to_path.to_owned();
> +                tmp_path.set_extension("tmp");
> +
> +                let tmpfile = std::fs::OpenOptions::new()
> +                    .write(true)
> +                    .create(true)
> +                    .read(true)
> +                    .open(&tmp_path)?;
> +
> +                // Note: be silent if there is no log - only log successful download
> +                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> +                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +                    }
> +                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +                }
> +
> +                Ok(())
> +            }
> +            PullSource::Local(_) => Ok(()),
> +        }
> +    }
> +
> +    fn get_chunk_reader(
> +        &self,
> +        snapshot: &pbs_api_types::BackupDir,
> +        crypt_mode: CryptMode,
> +    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
> +        Ok(match &self {
> +            PullSource::Remote(source) => {
> +                if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    Arc::new(RemoteChunkReader::new(
> +                        reader.clone(),
> +                        None,
> +                        crypt_mode,
> +                        HashMap::new(),
> +                    ))
> +                } else {
> +                    bail!("No initialized BackupReader!")
> +                }
> +            }
> +            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
> +                source.store.clone(),
> +                None,
> +                crypt_mode,
> +            )),
> +        })
> +    }
> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        match &self {
> +            PullSource::Remote(source) => source.ns.clone(),
> +            PullSource::Local(source) => source.ns.clone(),
> +        }
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        match &self {
> +            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
> +            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
> +        }
> +    }
> +}
> +
> +impl RemoteSource {
> +    fn get_client(&self) -> Result<&HttpClient, Error> {
> +        if let Some(client) = &self.client {
> +            Ok(client)
> +        } else {
> +            bail!("RemoteSource not initialized")
> +        }
> +    }
>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
>          remove_vanished: Option<bool>,
>          max_depth: Option<usize>,
>          group_filter: Option<Vec<GroupFilter>>,
> -        limit: RateLimitConfig,
>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> +        };
> +        let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let source: PullSource = if let Some(remote) = remote {

Since I'm already suggesting moving some code into dedicated impls
blocks, it would make sense here as well so this becomes

    let source = match remote {
        Some(remote) => PullSource::Remote(RemoteSource::new(remote)),
        None => PullSource::Local(LocalSource::new(remote_store, remote_ns)),
    };


> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        let remove_vanished = remove_vanished.unwrap_or(false);
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +            PullSource::Remote(RemoteSource {
> +                remote,
> +                repo,
> +                ns: remote_ns.clone(),
> +                client: None,
> +                backup_reader: HashMap::new(),
> +            })
> +        } else {
> +            PullSource::Local(LocalSource {
> +                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
> +                ns: remote_ns,
> +            })
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> +            ns,
> +        };
>  
>          Ok(Self {
> -            remote,
> -            remote_ns,
> -            ns,
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        self.source.init(limit).await
> +    }
> +
> +    pub(crate) fn skip_chunk_sync(&self) -> bool {

^ Minor nit: sounds a bit like an action, maybe `should_..` or just
`is_same_datastore()`?

> +        match &self.source {
> +            PullSource::Local(source) => source.store.name() == self.target.store.name(),
> +            PullSource::Remote(_) => false,
> +        }
> +    }
> +
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();

Taking `/some/source`

> +        source_ns.map_prefix(&source_ns, &self.target.ns)

and replacing its `/some/source` by `/other/thing` should always yield
`/other/thing`, no?

>      }
>  }
>  
> +async fn list_remote_namespaces(
> +    source: &RemoteSource,
> +    max_depth: &mut Option<usize>,
> +    worker: &WorkerTask,
> +) -> Result<Vec<BackupNamespace>, Error> {
> +    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +        vec![source.ns.clone()];

^ Missing a `return` here maybe?

> +    }
> +
> +    let path = format!(
> +        "api2/json/admin/datastore/{}/namespace",
> +        source.repo.store()
> +    );
> +    let mut data = json!({});
> +    if let Some(max_depth) = max_depth {
> +        data["max-depth"] = json!(max_depth);
> +    }
> +
> +    if !source.ns.is_root() {
> +        data["parent"] = json!(source.ns);
> +    }
> +
> +    let client = source.get_client()?;
> +    client.login().await?;
> +
> +    let mut result = match client.get(&path, Some(data)).await {
> +        Ok(res) => res,
> +        Err(err) => match err.downcast_ref::<HttpError>() {
> +            Some(HttpError { code, message }) => match code {
> +                &StatusCode::NOT_FOUND => {
> +                    if source.ns.is_root() && max_depth.is_none() {
> +                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> +                        max_depth.replace(0);
> +                    } else {
> +                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                    }
> +
> +                    return Ok(vec![source.ns.clone()]);
> +                }
> +                _ => {
> +                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                }
> +            },
> +            None => {
> +                bail!("Querying namespaces failed - {err}");
> +            }
> +        },
> +    };
> +
> +    let list: Vec<pbs_api_types::BackupNamespace> =

^ BackupNamespace is in scope.

> +        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +            .iter()
> +            .map(|list_item| list_item.ns.clone())
> +            .collect();
> +
> +    Ok(list)
> +}
> +
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> +/// - Load archive file into tmp file
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
>  async fn pull_single_archive(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    from_namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> -    let mut path = snapshot.full_path();
> +    let mut path = to_snapshot.full_path();
>      path.push(archive_name);
>  
>      let mut tmp_path = path.clone();
> @@ -273,13 +636,18 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    params
> +        .source
> +        .load_file_into(
> +            from_namespace,
> +            from_snapshot,
> +            archive_name,
> +            &tmp_path,
> +            worker,
> +        )
> +        .await?;
>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;
>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +657,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

^ typo: data{ts}ore

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -305,14 +679,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

^ typo: data{ts}ore

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;
> @@ -326,33 +706,6 @@ async fn pull_single_archive(
>      Ok(())
>  }
>  
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>  /// Actual implementation of pulling a snapshot.
>  ///
>  /// Pulling a snapshot consists of the following steps:
> @@ -364,44 +717,37 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let mut manifest_name = snapshot.full_path();
> +    let mut manifest_name = to_snapshot.full_path();
>      manifest_name.push(MANIFEST_BLOB_NAME);
>  
> -    let mut client_log_name = snapshot.full_path();
> +    let mut client_log_name = to_snapshot.full_path();
>      client_log_name.push(CLIENT_LOG_BLOB_NAME);
>  
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");
>  
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = params
> +        .source
> +        .load_file_into(
> +            namespace,
> +            from_snapshot,
> +            MANIFEST_BLOB_NAME,
> +            &tmp_manifest_name,
> +            worker,
> +        )
> +        .await?
> +    {
> +        tmp_manifest_blob = data;
> +    } else {
> +        return Ok(());
> +    }
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -418,8 +764,11 @@ async fn pull_snapshot(
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>              if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                params
> +                    .source
> +                    .try_download_client_log(from_snapshot, &client_log_name, worker)
> +                    .await?;
> +            };
>              task_log!(worker, "no data changes");
>              let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> @@ -429,7 +778,7 @@ async fn pull_snapshot(
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>  
>      for item in manifest.files() {
> -        let mut path = snapshot.full_path();
> +        let mut path = to_snapshot.full_path();
>          path.push(&item.filename);
>  
>          if path.exists() {
> @@ -467,18 +816,12 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>          pull_single_archive(
>              worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
>              item,
>              downloaded_chunks.clone(),
>          )
> @@ -490,10 +833,12 @@ async fn pull_snapshot(
>      }
>  
>      if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> -    snapshot
> +        params
> +            .source
> +            .try_download_client_log(from_snapshot, &client_log_name, worker)
> +            .await?;
> +    };
> +    to_snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
>  
> @@ -501,37 +846,53 @@ async fn pull_snapshot(
>  }
>  
>  /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> -///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.
>  async fn pull_snapshot_from(

Maybe add a `do_` prefix to the other one and drop the `_from` here,
since we have both from & to here now, so this name doesn't make sense
anymore.

>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let (_path, is_new, _snap_lock) = snapshot
> +    let (_path, is_new, _snap_lock) = to_snapshot
>          .datastore()
> -        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
> +        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
>  
>      if is_new {
> -        task_log!(worker, "sync snapshot {}", snapshot.dir());
> +        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> -            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
> -                snapshot.backup_ns(),
> -                snapshot.as_ref(),
> +        if let Err(err) = pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await
> +        {
> +            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
> +                to_snapshot.backup_ns(),
> +                to_snapshot.as_ref(),
>                  true,
>              ) {
>                  task_log!(worker, "cleanup error - {}", cleanup_err);
>              }
>              return Err(err);
>          }
> -        task_log!(worker, "sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
>      } else {
> -        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> -        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
> +        pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await?;
> +        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
>      }
>  
>      Ok(())
> @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
>  /// - Sort by snapshot time
>  /// - Get last snapshot timestamp on local datastore
>  /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader
>  /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>  /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>  ///
> @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> +    params: &mut PullParameters,
> +    source_namespace: &BackupNamespace,
>      group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;

So previously we took the passed remote_ns and replaced the *parameter*
remote_ns with the *params* local ns.

> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> +    let target_ns = params.get_target_ns()?;

While this will just give us the *params* target.

Aren't we syncing to different namespaces now?
It looks to me as if multiple namespaces would get merged to one now.

I find the use of `target_ns` and `params.target.ns` a bit confusing
below, too.

>  
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?;
>      let mut skip_info = SkipInfo {
>          oldest: i64::MAX,
>          newest: i64::MIN,
>          count: 0,
>      };
>  
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> +    let mut list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?
> +        .into_iter()
> +        .filter(|dir| {
> +            source_snapshots.insert(dir.time);
> +            if let Some(last_sync_time) = last_sync {
> +                if last_sync_time > dir.time {
> +                    skip_info.update(dir.time);
> +                    return false;
> +                }
> +            }
> +            true
> +        })
> +        .collect();
>  
> -        remote_snapshots.insert(snapshot.time);
> +    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
>  
> -        if let Some(last_sync_time) = last_sync {
> -            if last_sync_time > snapshot.time {
> -                skip_info.update(snapshot.time);
> -                continue;
> -            }
> -        }
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> -
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>  
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;

^ should this not be target_ns?

>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result = pull_snapshot_from(
> +            worker,
> +            params,
> +            source_namespace,
> +            &from_snapshot,
> +            &to_snapshot,
> +            downloaded_chunks.clone(),
> +        )
> +        .await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -703,11 +1025,14 @@ async fn pull_group(
>      }
>  
>      if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(target_ns.clone(), group.clone());
>          let local_list = group.list_backups()?;
>          for info in local_list {
>              let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                  continue;
>              }
>              if snapshot.is_protected() {
> @@ -720,6 +1045,7 @@ async fn pull_group(
>              }
>              task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
>              params
> +                .target
>                  .store
>                  .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
>          }
> @@ -732,64 +1058,12 @@ async fn pull_group(
>      Ok(())
>  }
>  
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>  fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>      let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>  
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>              .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>  
>          let name = match ns.components().last() {
> @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>              }
>          };
>  
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>              bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>          }
>          created = true;
>      }
>  
>      check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>          ns,
>          &params.owner,
>          PRIV_DATASTORE_BACKUP,
> @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>  }
>  
>  fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>          .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>  
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>  }
>  
>  fn check_and_remove_vanished_ns(
> @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
>      // clamp like remote does so that we don't list more than we can ever have synced.
>      let max_depth = params
>          .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>  
>      let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>          .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>          .filter(|ns| {
>              let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>              user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>          })
>          .collect();
> @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
>      local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>  
>      for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>              continue;
>          }
>  
> @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>      let mut errors = false;
>  
>      let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> -    } else {
> -        query_namespaces(worker, client, &mut params).await?
> -    };
> +    let mut namespaces = params
> +        .source
> +        .list_namespaces(&mut params.max_depth, worker)
> +        .await?;
>      errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
>  
>      let (mut groups, mut snapshots) = (0, 0);
>      let mut synced_ns = HashSet::with_capacity(namespaces.len());
>  
>      for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>  
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>  
>          task_log!(worker, "----");
>          task_log!(
> @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
>              }
>          }
>  
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>              Ok((ns_progress, ns_errors)) => {
>                  errors |= ns_errors;
>  
> @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
>                  task_log!(
>                      worker,
>                      "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                      err,
>                  );
>              }
> @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,

^ It's not clear to me which namespace this is supposed to be now?
And don't we need both?
`pull_store` creates the `target_ns` which is a source-to-target
prefix-mapped namespace, but further down we then again just use
`get_target_ns()` which is always the same.

> +    params: &mut PullParameters,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<pbs_api_types::BackupGroup> =
> +        params.source.list_groups(namespace, &params.owner).await?;
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>          if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>          } else {
>              type_order
>          }
> @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
>          filters.iter().any(|filter| group.matches(filter))
>      };
>  
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>      let list = if let Some(ref group_filter) = &params.group_filter {
>          let unfiltered_count = list.len();
>          let list: Vec<pbs_api_types::BackupGroup> = list
> @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
>  
>      let mut progress = StoreProgress::new(list.len() as u64);
>  
> +    let target_ns = params.get_target_ns()?;

^ here

>      for (done, group) in list.into_iter().enumerate() {
>          progress.done_groups = done as u64;
>          progress.done_snapshots = 0;
> @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
>  
>          let (owner, _lock_guard) =
>              match params
> +                .target
>                  .store
>                  .create_locked_backup_group(&target_ns, &group, &params.owner)
>              {
> @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
>                          err
>                      );
>                      errors = true; // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");
>                      continue;
>                  }
>              };
> @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
>                  owner
>              );
>              errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>          {
>              task_log!(worker, "sync group {} failed - {}", &group, err,);
>              errors = true; // do not stop here, instead continue
> @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
>  
>      if params.remove_vanished {
>          let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                  let local_group = local_group?;
>                  let local_group = local_group.group();
>                  if new_groups.contains(local_group) {
>                      continue;
>                  }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                  if check_backup_owner(&owner, &params.owner).is_err() {
>                      continue;
>                  }
> @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
>                      }
>                  }
>                  task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                      Ok(true) => {}
>                      Ok(false) => {
>                          task_log!(
> -- 
> 2.30.2




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs
  2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
                   ` (4 preceding siblings ...)
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling Hannes Laimer
@ 2023-02-28 11:35 ` Fabian Grünbichler
  5 siblings, 0 replies; 12+ messages in thread
From: Fabian Grünbichler @ 2023-02-28 11:35 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 23, 2023 1:55 pm, Hannes Laimer wrote:
> Add support for local sync. SyncJobs without a remote are considered local, and
> use a different logic for pulling. In the course of adding the extra pull logic,
> the pull code was rewritten to basically be source independent. Also cli
> completion and the UI was updated to allow Remotes in SyncJobs to be optional.

some sort of changelog between v1 and v2 would be nice both here and for the
patches themselves..

> Hannes Laimer (5):
>   api2: make Remote for SyncJob optional
>   ui: add support for optional Remote in SyncJob
>   manager: add completion for opt. Remote in SyncJob
>   pbs-client: accept a ref to a HttpClient in BackupReader::starting
>   pull: add support for local pulling
> 
>  pbs-api-types/src/datastore.rs       |    2 +-
>  pbs-api-types/src/jobs.rs            |    4 +-
>  pbs-client/src/backup_reader.rs      |    2 +-
>  pbs-datastore/src/read_chunk.rs      |    2 +-
>  proxmox-backup-client/src/catalog.rs |    4 +-
>  proxmox-backup-client/src/main.rs    |    2 +-
>  proxmox-backup-client/src/mount.rs   |    2 +-
>  src/api2/config/remote.rs            |    2 +-
>  src/api2/config/sync.rs              |   41 +-
>  src/api2/node/tasks.rs               |    4 +-
>  src/api2/pull.rs                     |   79 +-
>  src/bin/proxmox-backup-manager.rs    |   67 +-
>  src/bin/proxmox_backup_debug/diff.rs |    2 +-
>  src/server/email_notifications.rs    |   16 +-
>  src/server/pull.rs                   | 1023 ++++++++++++++++----------
>  www/form/RemoteTargetSelector.js     |   29 +-
>  www/window/SyncJobEdit.js            |    8 +-
>  17 files changed, 812 insertions(+), 477 deletions(-)
> 
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting Hannes Laimer
@ 2023-02-28 11:35   ` Fabian Grünbichler
  0 siblings, 0 replies; 12+ messages in thread
From: Fabian Grünbichler @ 2023-02-28 11:35 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 23, 2023 1:55 pm, Hannes Laimer wrote:
> ... since the function doesn't actually need the moved value.
> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
> 
> needed for next patch

but incomplete and breaks build of proxmox-file-restore.. you can order this up
front if you still need it in the next series, since it can be applied
independently..

error[E0308]: mismatched types
   --> proxmox-file-restore/src/main.rs:110:9
    |
109 |     let client = BackupReader::start(
    |                  ------------------- arguments to this function are incorrect
110 |         client,
    |         ^^^^^^
    |         |
    |         expected `&HttpClient`, found struct `HttpClient`
    |         help: consider borrowing here: `&client`
    |
note: associated function defined here

error[E0308]: mismatched types
   --> proxmox-file-restore/src/main.rs:433:9
    |
432 |     let client = BackupReader::start(
    |                  ------------------- arguments to this function are incorrect
433 |         client,
    |         ^^^^^^
    |         |
    |         expected `&HttpClient`, found struct `HttpClient`
    |         help: consider borrowing here: `&client`
    |
note: associated function defined here

For more information about this error, try `rustc --explain E0308`.
error: could not compile `proxmox-file-restore` due to 2 previous errors


>  pbs-client/src/backup_reader.rs      | 2 +-
>  proxmox-backup-client/src/catalog.rs | 4 ++--
>  proxmox-backup-client/src/main.rs    | 2 +-
>  proxmox-backup-client/src/mount.rs   | 2 +-
>  src/bin/proxmox_backup_debug/diff.rs | 2 +-
>  5 files changed, 6 insertions(+), 6 deletions(-)
> 
> diff --git a/pbs-client/src/backup_reader.rs b/pbs-client/src/backup_reader.rs
> index 2cd4dc27..36d8ebcf 100644
> --- a/pbs-client/src/backup_reader.rs
> +++ b/pbs-client/src/backup_reader.rs
> @@ -44,7 +44,7 @@ impl BackupReader {
>  
>      /// Create a new instance by upgrading the connection at '/api2/json/reader'
>      pub async fn start(
> -        client: HttpClient,
> +        client: &HttpClient,
>          crypt_config: Option<Arc<CryptConfig>>,
>          datastore: &str,
>          ns: &BackupNamespace,
> diff --git a/proxmox-backup-client/src/catalog.rs b/proxmox-backup-client/src/catalog.rs
> index 8c8c1458..72b22e67 100644
> --- a/proxmox-backup-client/src/catalog.rs
> +++ b/proxmox-backup-client/src/catalog.rs
> @@ -75,7 +75,7 @@ async fn dump_catalog(param: Value) -> Result<Value, Error> {
>      let client = connect(&repo)?;
>  
>      let client = BackupReader::start(
> -        client,
> +        &client,
>          crypt_config.clone(),
>          repo.store(),
>          &backup_ns,
> @@ -187,7 +187,7 @@ async fn catalog_shell(param: Value) -> Result<(), Error> {
>      };
>  
>      let client = BackupReader::start(
> -        client,
> +        &client,
>          crypt_config.clone(),
>          repo.store(),
>          &backup_ns,
> diff --git a/proxmox-backup-client/src/main.rs b/proxmox-backup-client/src/main.rs
> index 55198108..e7b5bde6 100644
> --- a/proxmox-backup-client/src/main.rs
> +++ b/proxmox-backup-client/src/main.rs
> @@ -1286,7 +1286,7 @@ async fn restore(
>      };
>  
>      let client = BackupReader::start(
> -        client,
> +        &client,
>          crypt_config.clone(),
>          repo.store(),
>          &ns,
> diff --git a/proxmox-backup-client/src/mount.rs b/proxmox-backup-client/src/mount.rs
> index 6810c19c..66bc56f7 100644
> --- a/proxmox-backup-client/src/mount.rs
> +++ b/proxmox-backup-client/src/mount.rs
> @@ -234,7 +234,7 @@ async fn mount_do(param: Value, pipe: Option<OwnedFd>) -> Result<Value, Error> {
>      };
>  
>      let client = BackupReader::start(
> -        client,
> +        &client,
>          crypt_config.clone(),
>          repo.store(),
>          &backup_ns,
> diff --git a/src/bin/proxmox_backup_debug/diff.rs b/src/bin/proxmox_backup_debug/diff.rs
> index 288d35ce..bb68322b 100644
> --- a/src/bin/proxmox_backup_debug/diff.rs
> +++ b/src/bin/proxmox_backup_debug/diff.rs
> @@ -293,7 +293,7 @@ async fn create_backup_reader(
>      };
>      let client = connect(&params.repo)?;
>      let backup_reader = BackupReader::start(
> -        client,
> +        &client,
>          params.crypt_config.clone(),
>          params.repo.store(),
>          &params.namespace,
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
  2023-02-28  9:41   ` Wolfgang Bumiller
@ 2023-02-28 11:35   ` Fabian Grünbichler
  1 sibling, 0 replies; 12+ messages in thread
From: Fabian Grünbichler @ 2023-02-28 11:35 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 23, 2023 1:55 pm, Hannes Laimer wrote:
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/jobs.rs         |  4 +-
>  src/api2/config/remote.rs         |  2 +-
>  src/api2/config/sync.rs           | 41 +++++++++++++------
>  src/api2/node/tasks.rs            |  4 +-
>  src/api2/pull.rs                  | 68 +++++++++++++++++++++++--------
>  src/server/email_notifications.rs | 16 ++++----
>  6 files changed, 93 insertions(+), 42 deletions(-)
> 
> diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs
> index cf7618c4..68db6cb8 100644
> --- a/pbs-api-types/src/jobs.rs
> +++ b/pbs-api-types/src/jobs.rs
> @@ -462,6 +462,7 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema =
>          },
>          remote: {
>              schema: REMOTE_ID_SCHEMA,
> +            optional: true,

could probably benefit from a description specifying that not specifying a
remote implies syncing from the local PBS system

>          },
>          "remote-store": {
>              schema: DATASTORE_SCHEMA,
> @@ -506,7 +507,8 @@ pub struct SyncJobConfig {
>      pub ns: Option<BackupNamespace>,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub owner: Option<Authid>,
> -    pub remote: String,
> +    #[serde(skip_serializing_if = "Option::is_none")]
> +    pub remote: Option<String>,
>      pub remote_store: String,
>      #[serde(skip_serializing_if = "Option::is_none")]
>      pub remote_ns: Option<BackupNamespace>,
> diff --git a/src/api2/config/remote.rs b/src/api2/config/remote.rs
> index 2f02d121..aa74bdc0 100644
> --- a/src/api2/config/remote.rs
> +++ b/src/api2/config/remote.rs
> @@ -268,7 +268,7 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
>  
>      let job_list: Vec<SyncJobConfig> = sync_jobs.convert_to_typed_array("sync")?;
>      for job in job_list {
> -        if job.remote == name {
> +        if job.remote.map_or(false, |id| id == name) {
>              param_bail!(
>                  "name",
>                  "remote '{}' is used by sync job '{}' (datastore '{}')",
> diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs
> index bd7373df..4c5d06e2 100644
> --- a/src/api2/config/sync.rs
> +++ b/src/api2/config/sync.rs
> @@ -8,8 +8,8 @@ use proxmox_schema::{api, param_bail};
>  
>  use pbs_api_types::{
>      Authid, SyncJobConfig, SyncJobConfigUpdater, JOB_ID_SCHEMA, PRIV_DATASTORE_AUDIT,
> -    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_AUDIT,
> -    PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
> +    PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, PRIV_DATASTORE_PRUNE, PRIV_DATASTORE_READ,
> +    PRIV_REMOTE_AUDIT, PRIV_REMOTE_READ, PROXMOX_CONFIG_DIGEST_SCHEMA,
>  };
>  use pbs_config::sync;
>  
> @@ -25,8 +25,13 @@ pub fn check_sync_job_read_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]);
> -    remote_privs & PRIV_REMOTE_AUDIT != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote]);
> +        remote_privs & PRIV_REMOTE_AUDIT != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_AUDIT != 0
> +    }
>  }
>  
>  /// checks whether user can run the corresponding pull job
> @@ -63,8 +68,13 @@ pub fn check_sync_job_modify_access(
>          return false;
>      }
>  
> -    let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote, &job.remote_store]);
> -    remote_privs & PRIV_REMOTE_READ != 0
> +    if let Some(remote) = &job.remote {
> +        let remote_privs = user_info.lookup_privs(auth_id, &["remote", remote, &job.remote_store]);
> +        remote_privs & PRIV_REMOTE_READ != 0
> +    } else {
> +        let source_ds_privs = user_info.lookup_privs(auth_id, &["datastore", &job.remote_store]);
> +        source_ds_privs & PRIV_DATASTORE_READ != 0
> +    }
>  }
>  
>  #[api(
> @@ -191,6 +201,8 @@ pub fn read_sync_job(id: String, rpcenv: &mut dyn RpcEnvironment) -> Result<Sync
>  #[serde(rename_all = "kebab-case")]
>  /// Deletable property name
>  pub enum DeletableProperty {
> +    /// Delete the remote property(-> meaning local).
> +    Remote,
>      /// Delete the owner property.
>      Owner,
>      /// Delete the comment property.
> @@ -273,6 +285,9 @@ pub fn update_sync_job(
>      if let Some(delete) = delete {
>          for delete_prop in delete {
>              match delete_prop {
> +                DeletableProperty::Remote => {
> +                    data.remote = None;
> +                }
>                  DeletableProperty::Owner => {
>                      data.owner = None;
>                  }
> @@ -329,7 +344,7 @@ pub fn update_sync_job(
>          data.ns = Some(ns);
>      }
>      if let Some(remote) = update.remote {
> -        data.remote = remote;
> +        data.remote = Some(remote);
>      }
>      if let Some(remote_store) = update.remote_store {
>          data.remote_store = remote_store;
> @@ -495,7 +510,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>  
>      let mut job = SyncJobConfig {
>          id: "regular".to_string(),
> -        remote: "remote0".to_string(),
> +        remote: Some("remote0".to_string()),
>          remote_store: "remotestore1".to_string(),
>          remote_ns: None,
>          store: "localstore0".to_string(),
> @@ -529,11 +544,11 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
>      // reading without proper read permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_read_access(&user_info, &read_auth_id, &job));
>  
> @@ -546,10 +561,10 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // writing without proper write permissions on local end must fail
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // writing without proper write permissions on remote end must fail
> -    job.remote = "remote0".to_string();
> +    job.remote = Some("remote0".to_string());
>      job.store = "localstore1".to_string();
>      assert!(!check_sync_job_modify_access(
>          &user_info,
> @@ -558,7 +573,7 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator
>      ));
>  
>      // reset remote to one where users have access
> -    job.remote = "remote1".to_string();
> +    job.remote = Some("remote1".to_string());
>  
>      // user with read permission can only read, but not modify/run
>      assert!(check_sync_job_read_access(&user_info, &read_auth_id, &job));
> diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs
> index d386f805..780cb6d1 100644
> --- a/src/api2/node/tasks.rs
> +++ b/src/api2/node/tasks.rs
> @@ -75,14 +75,14 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) ->
>                  let local_store = captures.get(3);
>                  let local_ns = captures.get(4).map(|m| m.as_str());
>  
> -                if let (Some(remote), Some(remote_store), Some(local_store)) =
> +                if let (remote, Some(remote_store), Some(local_store)) =
>                      (remote, remote_store, local_store)
>                  {
>                      return check_pull_privs(
>                          auth_id,
>                          local_store.as_str(),
>                          local_ns,
> -                        remote.as_str(),
> +                        remote.map(|remote| remote.as_str()),
>                          remote_store.as_str(),
>                          false,
>                      );
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index b2473ec8..bb8f6fe1 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -9,7 +9,8 @@ use proxmox_sys::task_log;
>  use pbs_api_types::{
>      Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA,
>      GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_REDUCED_SCHEMA, PRIV_DATASTORE_BACKUP,
> -    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
> +    PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, REMOTE_ID_SCHEMA,
> +    REMOVE_VANISHED_BACKUPS_SCHEMA,
>  };
>  use pbs_config::CachedUserInfo;
>  use proxmox_rest_server::WorkerTask;
> @@ -21,7 +22,7 @@ pub fn check_pull_privs(
>      auth_id: &Authid,
>      store: &str,
>      ns: Option<&str>,
> -    remote: &str,
> +    remote: Option<&str>,
>      remote_store: &str,
>      delete: bool,
>  ) -> Result<(), Error> {
> @@ -38,12 +39,22 @@ pub fn check_pull_privs(
>          PRIV_DATASTORE_BACKUP,
>          false,
>      )?;
> -    user_info.check_privs(
> -        auth_id,
> -        &["remote", remote, remote_store],
> -        PRIV_REMOTE_READ,
> -        false,
> -    )?;
> +
> +    if let Some(remote) = remote {
> +        user_info.check_privs(
> +            auth_id,
> +            &["remote", remote, remote_store],
> +            PRIV_REMOTE_READ,
> +            false,
> +        )?;
> +    } else {
> +        user_info.check_privs(
> +            auth_id,
> +            &["datastore", remote_store],
> +            PRIV_DATASTORE_BACKUP,
> +            false,
> +        )?;
> +    }
>  
>      if delete {
>          user_info.check_privs(
> @@ -64,7 +75,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>          PullParameters::new(
>              &sync_job.store,
>              sync_job.ns.clone().unwrap_or_default(),
> -            &sync_job.remote,
> +            sync_job.remote.clone().as_deref(),
>              &sync_job.remote_store,
>              sync_job.remote_ns.clone().unwrap_or_default(),
>              sync_job
> @@ -75,7 +86,6 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
>              sync_job.remove_vanished,
>              sync_job.max_depth,
>              sync_job.group_filter.clone(),
> -            sync_job.limit.clone(),

not too happy about the limit being dropped here.. but see comment on other patch!

>          )
>      }
>  }
> @@ -89,7 +99,7 @@ pub fn do_sync_job(
>  ) -> Result<String, Error> {
>      let job_id = format!(
>          "{}:{}:{}:{}:{}",
> -        sync_job.remote,
> +        sync_job.remote.clone().unwrap_or("localhost".to_string()),

doesn't this break the task access checks? probably need to adapt those and
either use some other format for local jobs, or use some placeholder that cannot
also be a remote name..

>          sync_job.remote_store,
>          sync_job.store,
>          sync_job.ns.clone().unwrap_or_default(),
> @@ -122,11 +132,34 @@ pub fn do_sync_job(
>                      worker,
>                      "sync datastore '{}' from '{}/{}'",
>                      sync_job.store,
> -                    sync_job.remote,
> +                    sync_job.remote.clone().unwrap_or("local".to_string()),

nit: "local" or "localhost" (or possibly even another format entirely, e.g.
"local sync of datastore '..' from '..')

>                      sync_job.remote_store,
>                  );
>  
> -                pull_store(&worker, &client, pull_params).await?;
> +                if sync_job.remote.is_some() {
> +                    pull_store(&worker, &client, pull_params).await?;
> +                } else {
> +                    match (sync_job.ns, sync_job.remote_ns) {
> +                        (Some(target_ns), Some(source_ns))
> +                            if target_ns.path().starts_with(source_ns.path())
> +                                && sync_job.store == sync_job.remote_store =>
> +                        {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync namespace into one of its sub-namespaces, skipping"
> +                            );
> +                        }
> +                        (_, None) if sync_job.store == sync_job.remote_store => {
> +                            task_log!(
> +                                worker,
> +                                "Can't sync root namespace into same datastore, skipping"
> +                            );
> +                        }
> +                        _ => {
> +                            pull_store(&worker, pull_params).await?;
> +                        }

I think this block is mostly not needed - it's perfectly fine to sync from a
namespace into a sub-namespace or from the root namespace into some other
(non-root) namespace.

there are only two things that are forbidden:
- syncing from one store+namespace combo into the exact same store+namespace
combo locally
- combinations which exceed the max namespace depth (same as for remote syncing,
so if possible handle at the same place)

> +                    }
> +                }
>  
>                  task_log!(worker, "sync job '{}' end", &job_id);
>  
> @@ -178,6 +211,7 @@ pub fn do_sync_job(
>              },
>              remote: {
>                  schema: REMOTE_ID_SCHEMA,
> +                optional: true,

same here - description should mention what no remote means

>              },
>              "remote-store": {
>                  schema: DATASTORE_SCHEMA,
> @@ -218,7 +252,7 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto
>  async fn pull(
>      store: String,
>      ns: Option<BackupNamespace>,
> -    remote: String,
> +    remote: Option<String>,
>      remote_store: String,
>      remote_ns: Option<BackupNamespace>,
>      remove_vanished: Option<bool>,
> @@ -241,7 +275,7 @@ async fn pull(
>          &auth_id,
>          &store,
>          ns_str.as_deref(),
> -        &remote,
> +        remote.as_deref(),
>          &remote_store,
>          delete,
>      )?;
> @@ -249,7 +283,7 @@ async fn pull(
>      let pull_params = PullParameters::new(
>          &store,
>          ns,
> -        &remote,
> +        remote.as_deref(),
>          &remote_store,
>          remote_ns.unwrap_or_default(),
>          auth_id.clone(),
> @@ -272,7 +306,7 @@ async fn pull(
>                  worker,
>                  "pull datastore '{}' from '{}/{}'",
>                  store,
> -                remote,
> +                remote.as_deref().unwrap_or("localhost"),

same as above, the local part might use a different formatting or at least a
consistent one ;)

>                  remote_store,
>              );
>  
> diff --git a/src/server/email_notifications.rs b/src/server/email_notifications.rs
> index b3298cf9..31a46b0f 100644
> --- a/src/server/email_notifications.rs
> +++ b/src/server/email_notifications.rs
> @@ -486,15 +486,15 @@ pub fn send_sync_status(
>          }
>      };
>  
> +    let source_str = if let Some(remote) = job.remote.clone() {
> +        format!("Sync remote '{}'", remote)
> +    } else {
> +        format!("Sync local")
> +    };
> +
>      let subject = match result {
> -        Ok(()) => format!(
> -            "Sync remote '{}' datastore '{}' successful",
> -            job.remote, job.remote_store,
> -        ),
> -        Err(_) => format!(
> -            "Sync remote '{}' datastore '{}' failed",
> -            job.remote, job.remote_store,
> -        ),
> +        Ok(()) => format!("{} datastore '{}' successful", source_str, job.remote_store,),
> +        Err(_) => format!("{} datastore '{}' failed", source_str, job.remote_store,),
>      };
>  
>      send_job_status_mail(email, &subject, &text)?;
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling
  2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling Hannes Laimer
  2023-02-28 11:25   ` Wolfgang Bumiller
@ 2023-02-28 11:36   ` Fabian Grünbichler
  1 sibling, 0 replies; 12+ messages in thread
From: Fabian Grünbichler @ 2023-02-28 11:36 UTC (permalink / raw)
  To: Proxmox Backup Server development discussion

On February 23, 2023 1:55 pm, Hannes Laimer wrote:
> ... and rewrite pull logic.

that's a bit terse ;)

there's also general refactoring interleaved with the local pull support, it
would be more easy to review if those two were split into separate patches.

general remarks (also partly repeated below):
- I don't like way of persisting the readers in the parameters, it shouldn't be
needed and isn't a nice structuring of the code
-- instead, at the point where previously a BackupReader is created (the loop
body in pull_group), you can create the equivalent of PullSource but for the
reader, which in turn
-- contains a BackupReader + RemoteChunkReader for the remote case
-- contains a lock guard + LocalChunkReader for the local case
- there's now a mix of helpers that are moved to the source structs and helpers
that are not, it should be one or the other across the board. adapting the
helpers in-place probably makes it easier to tell what changed, if they all have
the &source as parameter it's then trivial to move them to an impl block on
PullSource at the end and move longer parts to the local or remote impl block as
well (possibly as follow-up to reduce the amount of rebasing you have to do)..
- this patch would really benefit from being split into (at least) two patches
-- refactor the source handling with just the remote part (no semantic changes!)
-- add local pull support
- some of the comments above individual functions are wrong, please check them
carefully and adapt where needed
- please test recursive sync behaviour, the current version looks rather broken
to me (but I haven't done any in-depth testing)
- local reading doesn't lock the source, but it should (for remote this is
handled for us by the HTTP2 reader session)

> 
> Signed-off-by: Hannes Laimer <h.laimer@proxmox.com>
> ---
>  pbs-api-types/src/datastore.rs  |    2 +-
>  pbs-datastore/src/read_chunk.rs |    2 +-
>  src/api2/pull.rs                |   13 +-
>  src/server/pull.rs              | 1023 +++++++++++++++++++------------
>  4 files changed, 648 insertions(+), 392 deletions(-)
> 
> diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs
> index 72e8d1ee..9a692b08 100644
> --- a/pbs-api-types/src/datastore.rs
> +++ b/pbs-api-types/src/datastore.rs
> @@ -931,7 +931,7 @@ impl std::str::FromStr for BackupGroup {
>  /// Uniquely identify a Backup (relative to data store)
>  ///
>  /// We also call this a backup snaphost.
> -#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]
> +#[derive(Clone, Debug, Eq, Hash, PartialEq, Ord, PartialOrd, Deserialize, Serialize)]

same as the Sync part below..

>  #[serde(rename_all = "kebab-case")]
>  pub struct BackupDir {
>      /// Backup group.
> diff --git a/pbs-datastore/src/read_chunk.rs b/pbs-datastore/src/read_chunk.rs
> index c04a7431..29ee2d4c 100644
> --- a/pbs-datastore/src/read_chunk.rs
> +++ b/pbs-datastore/src/read_chunk.rs
> @@ -14,7 +14,7 @@ pub trait ReadChunk {
>      fn read_chunk(&self, digest: &[u8; 32]) -> Result<Vec<u8>, Error>;
>  }
>  
> -pub trait AsyncReadChunk: Send {
> +pub trait AsyncReadChunk: Send + Sync {

I guess you need this since you moved the BackupReader into PullParams? this
should not be needed..

>      /// Returns the encoded chunk data
>      fn read_raw_chunk<'a>(
>          &'a self,
> diff --git a/src/api2/pull.rs b/src/api2/pull.rs
> index bb8f6fe1..2966190c 100644
> --- a/src/api2/pull.rs
> +++ b/src/api2/pull.rs
> @@ -121,8 +121,8 @@ pub fn do_sync_job(
>              let sync_job2 = sync_job.clone();
>  
>              let worker_future = async move {
> -                let pull_params = PullParameters::try_from(&sync_job)?;
> -                let client = pull_params.client().await?;
> +                let mut pull_params = PullParameters::try_from(&sync_job)?;
> +                pull_params.init_source(sync_job.limit).await?;

see below

>  
>                  task_log!(worker, "Starting datastore sync job '{}'", job_id);
>                  if let Some(event_str) = schedule {
> @@ -137,7 +137,7 @@ pub fn do_sync_job(
>                  );
>  
>                  if sync_job.remote.is_some() {
> -                    pull_store(&worker, &client, pull_params).await?;
> +                    pull_store(&worker, pull_params).await?;
>                  } else {
>                      match (sync_job.ns, sync_job.remote_ns) {
>                          (Some(target_ns), Some(source_ns))
> @@ -280,7 +280,7 @@ async fn pull(
>          delete,
>      )?;
>  
> -    let pull_params = PullParameters::new(
> +    let mut pull_params = PullParameters::new(
>          &store,
>          ns,
>          remote.as_deref(),
> @@ -290,9 +290,8 @@ async fn pull(
>          remove_vanished,
>          max_depth,
>          group_filter,
> -        limit,

see below
>      )?;
> -    let client = pull_params.client().await?;
> +    pull_params.init_source(limit).await?;

see below
>  
>      // fixme: set to_stdout to false?
>      // FIXME: add namespace to worker id?
> @@ -310,7 +309,7 @@ async fn pull(
>                  remote_store,
>              );
>  
> -            let pull_future = pull_store(&worker, &client, pull_params);
> +            let pull_future = pull_store(&worker, pull_params);
>              (select! {
>                  success = pull_future.fuse() => success,
>                  abort = worker.abort_future().map(|_| Err(format_err!("pull aborted"))) => abort,
> diff --git a/src/server/pull.rs b/src/server/pull.rs
> index 65eedf2c..d3be39da 100644
> --- a/src/server/pull.rs
> +++ b/src/server/pull.rs
> @@ -1,28 +1,26 @@
>  //! Sync datastore from remote server
>  
>  use std::collections::{HashMap, HashSet};
> -use std::io::{Seek, SeekFrom};
> +use std::io::{Seek, SeekFrom, Write};
> +use std::path::PathBuf;
>  use std::sync::atomic::{AtomicUsize, Ordering};
>  use std::sync::{Arc, Mutex};
>  use std::time::SystemTime;
>  
>  use anyhow::{bail, format_err, Error};
>  use http::StatusCode;
> -use pbs_config::CachedUserInfo;
> -use serde_json::json;
> -
> +use proxmox_rest_server::WorkerTask;
>  use proxmox_router::HttpError;
>  use proxmox_sys::task_log;
> +use serde_json::json;
>  
>  use pbs_api_types::{
> -    print_store_and_ns, Authid, BackupNamespace, GroupFilter, GroupListItem, NamespaceListItem,
> +    print_store_and_ns, Authid, BackupDir, BackupNamespace, CryptMode, GroupFilter, GroupListItem,
>      Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH,
>      PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP,
>  };
> -
> -use pbs_client::{
> -    BackupReader, BackupRepository, HttpClient, HttpClientOptions, RemoteChunkReader,
> -};
> +use pbs_client::{BackupReader, BackupRepository, HttpClient, RemoteChunkReader};
> +use pbs_config::CachedUserInfo;
>  use pbs_datastore::data_blob::DataBlob;
>  use pbs_datastore::dynamic_index::DynamicIndexReader;
>  use pbs_datastore::fixed_index::FixedIndexReader;
> @@ -30,25 +28,21 @@ use pbs_datastore::index::IndexFile;
>  use pbs_datastore::manifest::{
>      archive_type, ArchiveType, BackupManifest, FileInfo, CLIENT_LOG_BLOB_NAME, MANIFEST_BLOB_NAME,
>  };
> -use pbs_datastore::{check_backup_owner, DataStore, StoreProgress};
> +use pbs_datastore::read_chunk::AsyncReadChunk;
> +use pbs_datastore::{
> +    check_backup_owner, DataStore, ListNamespacesRecursive, LocalChunkReader, StoreProgress,
> +};
>  use pbs_tools::sha::sha256;
> -use proxmox_rest_server::WorkerTask;
>  
> -use crate::backup::{check_ns_modification_privs, check_ns_privs};
> +use crate::backup::{check_ns_modification_privs, check_ns_privs, ListAccessibleBackupGroups};
>  use crate::tools::parallel_handler::ParallelHandler;
>  
>  /// Parameters for a pull operation.
>  pub(crate) struct PullParameters {
> -    /// Remote that is pulled from
> -    remote: Remote,
> -    /// Full specification of remote datastore
> -    source: BackupRepository,
> -    /// Local store that is pulled into
> -    store: Arc<DataStore>,
> -    /// Remote namespace
> -    remote_ns: BackupNamespace,
> -    /// Local namespace (anchor)
> -    ns: BackupNamespace,
> +    /// Where data is pulled from
> +    source: PullSource,
> +    /// Where data should be pulled into
> +    target: PullTarget,
>      /// Owner of synced groups (needs to match local owner of pre-existing groups)
>      owner: Authid,
>      /// Whether to remove groups which exist locally, but not on the remote end
> @@ -57,70 +51,459 @@ pub(crate) struct PullParameters {
>      max_depth: Option<usize>,
>      /// Filters for reducing the pull scope
>      group_filter: Option<Vec<GroupFilter>>,
> -    /// Rate limits for all transfers from `remote`
> -    limit: RateLimitConfig,
> +}
> +
> +pub(crate) enum PullSource {
> +    Remote(RemoteSource),
> +    Local(LocalSource),
> +}
> +
> +pub(crate) struct PullTarget {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct LocalSource {
> +    store: Arc<DataStore>,
> +    ns: BackupNamespace,
> +}
> +
> +pub(crate) struct RemoteSource {
> +    remote: Remote,
> +    repo: BackupRepository,
> +    ns: BackupNamespace,
> +    client: Option<HttpClient>,
> +    backup_reader: HashMap<pbs_api_types::BackupDir, Arc<BackupReader>>,

this is not needed all - you never pull multiple snapshots in parallel, so
there is no need to store readers for each pulled snapshot.

there also shouldn't be state stored in PullParams at all, it should always be
possible to pass that down from caller to next layer..

> +}
> +
> +impl PullSource {
> +    pub(crate) async fn init(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        match self {
> +            PullSource::Remote(source) => {
> +                source.client.replace(
> +                    crate::api2::config::remote::remote_client(&source.remote, Some(limit)).await?,
> +                );
> +            }
> +            PullSource::Local(_) => {}
> +        };
> +        Ok(())
> +    }

drop this, and keep the old client helper but move it to PullSource::Remote

> +
> +    async fn list_namespaces(
> +        &self,
> +        max_depth: &mut Option<usize>,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<BackupNamespace>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => list_remote_namespaces(source, max_depth, worker).await,

this checks permissions (on the remote system)

> +            PullSource::Local(source) => ListNamespacesRecursive::new_max_depth(
> +                source.store.clone(),
> +                source.ns.clone(),
> +                max_depth.unwrap_or(MAX_NAMESPACE_DEPTH),
> +            )?
> +            .collect(),

this doesn't, but should

> +        }
> +    }
> +
> +    async fn list_groups(
> +        &self,
> +        namespace: &BackupNamespace,
> +        owner: &Authid,
> +    ) -> Result<Vec<pbs_api_types::BackupGroup>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!("api2/json/admin/datastore/{}/groups", source.repo.store());
> +
> +                let args = if !namespace.is_root() {
> +                    Some(json!({ "ns": namespace.clone() }))
> +                } else {
> +                    None
> +                };
> +
> +                let client = source.get_client()?;
> +                client.login().await?;

this could just initialize a new client using the helper, instead of retrieving
a stored one that might have an expired ticket..

> +                let mut result = client.get(&path, args).await.map_err(|err| {
> +                    format_err!("Failed to retrieve backup groups from remote - {}", err)
> +                })?;
> +
> +                Ok(
> +                    serde_json::from_value::<Vec<GroupListItem>>(result["data"].take())
> +                        .map_err(|e| Error::from(e))?

could just be .map_err(Error::from), but

> +                        .into_iter()
> +                        .map(|item| item.backup)
> +                        .collect::<Vec<pbs_api_types::BackupGroup>>(),
> +                )

the whole thing is probably easier to read when done like this:

  let list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
  let list: Vec<pbs_api_types::BackupGroup> =
    list.into_iter().map(|group| group.backup).collect();

  Ok(list)

> +            }
> +            PullSource::Local(source) => Ok(ListAccessibleBackupGroups::new_with_privs(
> +                &source.store,
> +                namespace.clone(),
> +                MAX_NAMESPACE_DEPTH,
> +                None,
> +                None,
> +                Some(owner),
> +            )?
> +            .filter_map(Result::ok)
> +            .map(|backup_group| backup_group.group().clone())
> +            .collect::<Vec<pbs_api_types::BackupGroup>>()),

this has two issues:
- it recurses over namespaces, while it should only list groups in the current
namespace without recursion
- it doesn't set the expected privs, so this will potentially list too few or
too many groups as well, even within a namespace where the user is supposed to
have access
-- too little if the user has PRIV_DATASTORE_READ and should be able to read
groups owned by other users/tokens
-- too many if the user only has PRIV_DATASTORE_AUDIT, since owned groups are
then readable despite missing PRIV_DATASTORE_BACKUP

> +        }
> +    }
> +
> +    async fn list_backup_dirs(
> +        &self,
> +        namespace: &BackupNamespace,
> +        group: &pbs_api_types::BackupGroup,
> +        worker: &WorkerTask,
> +    ) -> Result<Vec<pbs_api_types::BackupDir>, Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let path = format!(
> +                    "api2/json/admin/datastore/{}/snapshots",
> +                    source.repo.store()
> +                );
> +
> +                let mut args = json!({
> +                    "backup-type": group.ty,
> +                    "backup-id": group.id,
> +                });
> +
> +                if !source.ns.is_root() {
> +                    args["ns"] = serde_json::to_value(&source.ns)?;
> +                }
> +
> +                let client = source.get_client()?;
> +                client.login().await?;

this could probably also get a fresh client..

> +
> +                let mut result = client.get(&path, Some(args)).await?;
> +                let snapshot_list: Vec<SnapshotListItem> =
> +                    serde_json::from_value(result["data"].take())?;
> +                Ok(snapshot_list
> +                    .into_iter()
> +                    .filter_map(|item: SnapshotListItem| {
> +                        let snapshot = item.backup;
> +                        // in-progress backups can't be synced
> +                        if item.size.is_none() {
> +                            task_log!(
> +                                worker,
> +                                "skipping snapshot {} - in-progress backup",
> +                                snapshot
> +                            );
> +                            return None;
> +                        }
> +
> +                        Some(snapshot)
> +                    })
> +                    .collect::<Vec<BackupDir>>())
> +            }
> +            PullSource::Local(source) => Ok(source
> +                .store
> +                .backup_group(namespace.clone(), group.clone())
> +                .iter_snapshots()?
> +                .filter_map(Result::ok)

this hides errors when iterating..

> +                .map(|snapshot| snapshot.dir().to_owned())

but doesn't skip "in-progress" snapshots like the remote version does..

> +                .collect::<Vec<BackupDir>>()),
> +        }
> +    }
> +
> +    /// Load file from source namespace and BackupDir into file
> +    async fn load_file_into(
> +        &mut self,
> +        namespace: &BackupNamespace,
> +        snapshot: &pbs_api_types::BackupDir,
> +        filename: &str,
> +        into: &PathBuf,
> +        worker: &WorkerTask,
> +    ) -> Result<Option<DataBlob>, Error> {
> +        let mut tmp_file = std::fs::OpenOptions::new()
> +            .write(true)
> +            .create(true)
> +            .truncate(true)
> +            .read(true)
> +            .open(into)?;
> +        match self {
> +            PullSource::Remote(ref mut source) => {
> +                let client = source.get_client()?;
> +                client.login().await?;
> +
> +                let reader = if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    reader.clone()
> +                } else {
> +                    let backup_reader = BackupReader::start(
> +                        client,
> +                        None,
> +                        source.repo.store(),
> +                        namespace,
> +                        snapshot,
> +                        true,
> +                    )
> +                    .await?;
> +                    source
> +                        .backup_reader
> +                        .insert(snapshot.clone(), backup_reader.clone());
> +                    backup_reader
> +                };
> +
> +                let download_result = reader.download(filename, &mut tmp_file).await;
> +
> +                if let Err(err) = download_result {
> +                    match err.downcast_ref::<HttpError>() {
> +                        Some(HttpError { code, message }) => match *code {
> +                            StatusCode::NOT_FOUND => {
> +                                task_log!(
> +                                    worker,
> +                                    "skipping snapshot {} - vanished since start of sync",

this was previously only logged when the manifest went missing.. when
downloading the indices later on this shouldn't happen since we have an active
reader session open which holds a log, so nobody should be able to pull out the
snapshot under us and it should be a hard error..

> +                                    snapshot,
> +                                );
> +                                return Ok(None);
> +                            }
> +                            _ => {
> +                                bail!("HTTP error {code} - {message}");
> +                            }
> +                        },
> +                        None => {
> +                            return Err(err);
> +                        }
> +                    };
> +                };
> +            }
> +            PullSource::Local(source) => {
> +                let dir = source
> +                    .store
> +                    .backup_dir(namespace.clone(), snapshot.clone())?;
> +                let mut from_path = dir.full_path();
> +                from_path.push(filename);
> +                tmp_file.write_all(std::fs::read(from_path)?.as_slice())?;
> +            }
> +        }
> +
> +        tmp_file.seek(SeekFrom::Start(0))?;
> +        Ok(DataBlob::load_from_reader(&mut tmp_file).ok())

hides errors!!

note that this seeks to the beginning and loads the blob (see [0] markers)

> +    }
> +
> +    // Note: The client.log.blob is uploaded after the backup, so it is
> +    // not mentioned in the manifest.
> +    async fn try_download_client_log(
> +        &self,
> +        from_snapshot: &pbs_api_types::BackupDir,

there is only a single snapshot involved

> +        to_path: &std::path::Path,

and a single path, so not sure whether we really need the from/to prefix?

> +        worker: &WorkerTask,

worker should come first..

> +    ) -> Result<(), Error> {
> +        match &self {
> +            PullSource::Remote(source) => {
> +                let reader = source
> +                    .backup_reader
> +                    .get(from_snapshot)
> +                    .ok_or(format_err!("Can't download chunks without a BackupReader"))?;
> +                let mut tmp_path = to_path.to_owned();
> +                tmp_path.set_extension("tmp");
> +
> +                let tmpfile = std::fs::OpenOptions::new()
> +                    .write(true)
> +                    .create(true)
> +                    .read(true)
> +                    .open(&tmp_path)?;
> +
> +                // Note: be silent if there is no log - only log successful download
> +                if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> +                    if let Err(err) = std::fs::rename(&tmp_path, to_path) {
> +                        bail!("Atomic rename file {:?} failed - {}", to_path, err);
> +                    }
> +                    task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> +                }
> +
> +                Ok(())

this should probably return the tmpfile, so that we only open it once..

> +            }
> +            PullSource::Local(_) => Ok(()),

local sync should also copy the log? also, similar to other parts - this is
mostly the old code refactored, but also slightly changed. it would have been
nice to have the "refactor" part first (with only a single match arm), and the
add local part second.

> +        }
> +    }
> +
> +    fn get_chunk_reader(
> +        &self,
> +        snapshot: &pbs_api_types::BackupDir,
> +        crypt_mode: CryptMode,
> +    ) -> Result<Arc<dyn AsyncReadChunk>, Error> {
> +        Ok(match &self {
> +            PullSource::Remote(source) => {
> +                if let Some(reader) = source.backup_reader.get(snapshot) {
> +                    Arc::new(RemoteChunkReader::new(
> +                        reader.clone(),
> +                        None,
> +                        crypt_mode,
> +                        HashMap::new(),
> +                    ))
> +                } else {
> +                    bail!("No initialized BackupReader!")
> +                }
> +            }
> +            PullSource::Local(source) => Arc::new(LocalChunkReader::new(
> +                source.store.clone(),
> +                None,
> +                crypt_mode,
> +            )),
> +        })
> +    }

shouldn't be needed - the reader should be passed down on its own.. a helper for
the first time the source needs to be converted to a reader might still be a
good idea, but without persisting inside the source..

> +
> +    fn get_ns(&self) -> BackupNamespace {
> +        match &self {
> +            PullSource::Remote(source) => source.ns.clone(),
> +            PullSource::Local(source) => source.ns.clone(),
> +        }
> +    }
> +
> +    fn print_store_and_ns(&self) -> String {
> +        match &self {
> +            PullSource::Remote(source) => print_store_and_ns(source.repo.store(), &source.ns),
> +            PullSource::Local(source) => print_store_and_ns(source.store.name(), &source.ns),
> +        }
> +    }
> +}
> +
> +impl RemoteSource {
> +    fn get_client(&self) -> Result<&HttpClient, Error> {
> +        if let Some(client) = &self.client {
> +            Ok(client)
> +        } else {
> +            bail!("RemoteSource not initialized")
> +        }
> +    }

should instead return a fresh client..

>  }
>  
>  impl PullParameters {
>      /// Creates a new instance of `PullParameters`.
> -    ///
> -    /// `remote` will be dereferenced via [pbs_api_types::RemoteConfig], and combined into a
> -    /// [BackupRepository] with `remote_store`.
> -    #[allow(clippy::too_many_arguments)]
>      pub(crate) fn new(
>          store: &str,
>          ns: BackupNamespace,
> -        remote: &str,
> +        remote: Option<&str>,
>          remote_store: &str,
>          remote_ns: BackupNamespace,
>          owner: Authid,
>          remove_vanished: Option<bool>,
>          max_depth: Option<usize>,
>          group_filter: Option<Vec<GroupFilter>>,
> -        limit: RateLimitConfig,

should either move to RemoteSource, or be implemented for both variants and stay here..

>      ) -> Result<Self, Error> {
> -        let store = DataStore::lookup_datastore(store, Some(Operation::Write))?;
> -
>          if let Some(max_depth) = max_depth {
>              ns.check_max_depth(max_depth)?;
>              remote_ns.check_max_depth(max_depth)?;
> -        }
> +        };
> +        let remove_vanished = remove_vanished.unwrap_or(false);
>  
> -        let (remote_config, _digest) = pbs_config::remote::config()?;
> -        let remote: Remote = remote_config.lookup("remote", remote)?;
> +        let source: PullSource = if let Some(remote) = remote {
> +            let (remote_config, _digest) = pbs_config::remote::config()?;
> +            let remote: Remote = remote_config.lookup("remote", remote)?;
>  
> -        let remove_vanished = remove_vanished.unwrap_or(false);
> +            let repo = BackupRepository::new(
> +                Some(remote.config.auth_id.clone()),
> +                Some(remote.config.host.clone()),
> +                remote.config.port,
> +                remote_store.to_string(),
> +            );
>  
> -        let source = BackupRepository::new(
> -            Some(remote.config.auth_id.clone()),
> -            Some(remote.config.host.clone()),
> -            remote.config.port,
> -            remote_store.to_string(),
> -        );
> +            PullSource::Remote(RemoteSource {
> +                remote,
> +                repo,
> +                ns: remote_ns.clone(),
> +                client: None,
> +                backup_reader: HashMap::new(),
> +            })
> +        } else {
> +            PullSource::Local(LocalSource {
> +                store: DataStore::lookup_datastore(remote_store, Some(Operation::Read))?,
> +                ns: remote_ns,
> +            })
> +        };
> +        let target = PullTarget {
> +            store: DataStore::lookup_datastore(store, Some(Operation::Write))?,
> +            ns,
> +        };
>  
>          Ok(Self {
> -            remote,
> -            remote_ns,
> -            ns,
>              source,
> -            store,
> +            target,
>              owner,
>              remove_vanished,
>              max_depth,
>              group_filter,
> -            limit,
>          })
>      }
>  
> -    /// Creates a new [HttpClient] for accessing the [Remote] that is pulled from.
> -    pub async fn client(&self) -> Result<HttpClient, Error> {
> -        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
> +    pub(crate) async fn init_source(&mut self, limit: RateLimitConfig) -> Result<(), Error> {
> +        self.source.init(limit).await
> +    }

see other related comments

> +
> +    pub(crate) fn skip_chunk_sync(&self) -> bool {
> +        match &self.source {
> +            PullSource::Local(source) => source.store.name() == self.target.store.name(),
> +            PullSource::Remote(_) => false,
> +        }
> +    }
> +
> +    pub(crate) fn get_target_ns(&self) -> Result<BackupNamespace, Error> {
> +        let source_ns = self.source.get_ns();
> +        source_ns.map_prefix(&source_ns, &self.target.ns)

this doesn't do the right thing, see the two call sites..

>      }
>  }
>  
> +async fn list_remote_namespaces(
> +    source: &RemoteSource,
> +    max_depth: &mut Option<usize>,
> +    worker: &WorkerTask,

worker usually comes first if passed as argument.. but it's only passed in for
the two log statements, which are actually possible to handle at the call site
as well (there's a check there for whether max_depth was modified).

> +) -> Result<Vec<BackupNamespace>, Error> {
> +    if source.ns.is_root() && max_depth.map_or(false, |depth| depth == 0) {
> +        vec![source.ns.clone()];

missing 'return' I think? also changes the check for some reason, although they
are semantically the same I am not sure it's worth the churn in an already quite
crowded patch (series).

> +    }
> +
> +    let path = format!(
> +        "api2/json/admin/datastore/{}/namespace",
> +        source.repo.store()
> +    );
> +    let mut data = json!({});
> +    if let Some(max_depth) = max_depth {
> +        data["max-depth"] = json!(max_depth);
> +    }
> +
> +    if !source.ns.is_root() {
> +        data["parent"] = json!(source.ns);
> +    }
> +
> +    let client = source.get_client()?;
> +    client.login().await?;
> +
> +    let mut result = match client.get(&path, Some(data)).await {
> +        Ok(res) => res,
> +        Err(err) => match err.downcast_ref::<HttpError>() {
> +            Some(HttpError { code, message }) => match code {
> +                &StatusCode::NOT_FOUND => {
> +                    if source.ns.is_root() && max_depth.is_none() {
> +                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> +                        task_log!(worker, "Either make backwards-compat mode
explicit (max-depth == 0) or upgrade remote system.");

see above, this could be logged at the call site to avoid passing in worker at
all. or it could remain here, but then please put the worker first in the
argument list ;)

> +                        max_depth.replace(0);
> +                    } else {
> +                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> +                    }
> +
> +                    return Ok(vec![source.ns.clone()]);
> +                }
> +                _ => {
> +                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> +                }
> +            },
> +            None => {
> +                bail!("Querying namespaces failed - {err}");
> +            }
> +        },
> +    };
> +
> +    let list: Vec<pbs_api_types::BackupNamespace> =
> +        serde_json::from_value::<Vec<pbs_api_types::NamespaceListItem>>(result["data"].take())?
> +            .iter()
> +            .map(|list_item| list_item.ns.clone())
> +            .collect();
> +
> +    Ok(list)
> +}

this is one of the examples that would have really benefited from splitting this
into a "no-change refactor PullParams with RemoteSource" patch and an "add local
pull support" patch.. this is basically the old query_namespaces with parameter
adjustements, but then *also* other changes. now because the location also
completely changed, I have to manually diff the two to see what actually changed
(or if semantic changes are hidden in the noise).

> +
>  async fn pull_index_chunks<I: IndexFile>(
>      worker: &WorkerTask,
> -    chunk_reader: RemoteChunkReader,
> +    chunk_reader: Arc<dyn AsyncReadChunk>,
>      target: Arc<DataStore>,
>      index: I,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
> @@ -211,26 +594,6 @@ async fn pull_index_chunks<I: IndexFile>(
>      Ok(())
>  }
>  
> -async fn download_manifest(
> -    reader: &BackupReader,
> -    filename: &std::path::Path,
> -) -> Result<std::fs::File, Error> {
> -    let mut tmp_manifest_file = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .truncate(true)
> -        .read(true)
> -        .open(filename)?;
> -
> -    reader
> -        .download(MANIFEST_BLOB_NAME, &mut tmp_manifest_file)
> -        .await?;
> -
> -    tmp_manifest_file.seek(SeekFrom::Start(0))?;
> -
> -    Ok(tmp_manifest_file)
> -}
> -
>  fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Error> {
>      if size != info.size {
>          bail!(
> @@ -251,21 +614,21 @@ fn verify_archive(info: &FileInfo, csum: &[u8; 32], size: u64) -> Result<(), Err
>  /// Pulls a single file referenced by a manifest.
>  ///
>  /// Pulling an archive consists of the following steps:
> -/// - Create tmp file for archive
> -/// - Download archive file into tmp file
> +/// - Load archive file into tmp file
>  /// - Verify tmp file checksum
>  /// - if archive is an index, pull referenced chunks
>  /// - Rename tmp file into real path
>  async fn pull_single_archive(
>      worker: &WorkerTask,
> -    reader: &BackupReader,
> -    chunk_reader: &mut RemoteChunkReader,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    from_namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      archive_info: &FileInfo,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
>      let archive_name = &archive_info.filename;
> -    let mut path = snapshot.full_path();
> +    let mut path = to_snapshot.full_path();
>      path.push(archive_name);
>  
>      let mut tmp_path = path.clone();
> @@ -273,13 +636,18 @@ async fn pull_single_archive(
>  
>      task_log!(worker, "sync archive {}", archive_name);
>  
> -    let mut tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> +    params
> +        .source
> +        .load_file_into(
> +            from_namespace,
> +            from_snapshot,
> +            archive_name,
> +            &tmp_path,
> +            worker,
> +        )
> +        .await?;

[0] this returns the blob, but it's not used..

>  
> -    reader.download(archive_name, &mut tmpfile).await?;
> +    let mut tmpfile = std::fs::OpenOptions::new().read(true).open(&tmp_path)?;

couldn't load_file_into just return the open file?

>  
>      match archive_type(archive_name)? {
>          ArchiveType::DynamicIndex => {
> @@ -289,14 +657,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

typo, also I asked whether it wouldn't make sense to check that the chunks are
there when reviewing v1? it's basically "only" a series of 'stat' calls.

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::FixedIndex => {
>              let index = FixedIndexReader::new(tmpfile).map_err(|err| {
> @@ -305,14 +679,20 @@ async fn pull_single_archive(
>              let (csum, size) = index.compute_csum();
>              verify_archive(archive_info, &csum, size)?;
>  
> -            pull_index_chunks(
> -                worker,
> -                chunk_reader.clone(),
> -                snapshot.datastore().clone(),
> -                index,
> -                downloaded_chunks,
> -            )
> -            .await?;
> +            if params.skip_chunk_sync() {
> +                task_log!(worker, "skipping chunk sync for same datatsore");

same as above for dynamic indices..

> +            } else {
> +                pull_index_chunks(
> +                    worker,
> +                    params
> +                        .source
> +                        .get_chunk_reader(from_snapshot, archive_info.crypt_mode)?,
> +                    params.target.store.clone(),
> +                    index,
> +                    downloaded_chunks,
> +                )
> +                .await?;
> +            }
>          }
>          ArchiveType::Blob => {
>              tmpfile.seek(SeekFrom::Start(0))?;

[0] so here we *again* seek to the start, which load_file_into already does for
us *if* we skip the blob loading there..

> @@ -326,33 +706,6 @@ async fn pull_single_archive(
>      Ok(())
>  }
>  
> -// Note: The client.log.blob is uploaded after the backup, so it is
> -// not mentioned in the manifest.
> -async fn try_client_log_download(
> -    worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    path: &std::path::Path,
> -) -> Result<(), Error> {
> -    let mut tmp_path = path.to_owned();
> -    tmp_path.set_extension("tmp");
> -
> -    let tmpfile = std::fs::OpenOptions::new()
> -        .write(true)
> -        .create(true)
> -        .read(true)
> -        .open(&tmp_path)?;
> -
> -    // Note: be silent if there is no log - only log successful download
> -    if let Ok(()) = reader.download(CLIENT_LOG_BLOB_NAME, tmpfile).await {
> -        if let Err(err) = std::fs::rename(&tmp_path, path) {
> -            bail!("Atomic rename file {:?} failed - {}", path, err);
> -        }
> -        task_log!(worker, "got backup log file {:?}", CLIENT_LOG_BLOB_NAME);
> -    }
> -
> -    Ok(())
> -}
> -
>  /// Actual implementation of pulling a snapshot.
>  ///
>  /// Pulling a snapshot consists of the following steps:
> @@ -364,44 +717,37 @@ async fn try_client_log_download(
>  /// - Download log if not already existing
>  async fn pull_snapshot(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let mut manifest_name = snapshot.full_path();
> +    let mut manifest_name = to_snapshot.full_path();
>      manifest_name.push(MANIFEST_BLOB_NAME);
>  
> -    let mut client_log_name = snapshot.full_path();
> +    let mut client_log_name = to_snapshot.full_path();
>      client_log_name.push(CLIENT_LOG_BLOB_NAME);
>  
>      let mut tmp_manifest_name = manifest_name.clone();
>      tmp_manifest_name.set_extension("tmp");
>  
> -    let download_res = download_manifest(&reader, &tmp_manifest_name).await;
> -    let mut tmp_manifest_file = match download_res {
> -        Ok(manifest_file) => manifest_file,
> -        Err(err) => {
> -            match err.downcast_ref::<HttpError>() {
> -                Some(HttpError { code, message }) => match *code {
> -                    StatusCode::NOT_FOUND => {
> -                        task_log!(
> -                            worker,
> -                            "skipping snapshot {} - vanished since start of sync",
> -                            snapshot.dir(),
> -                        );
> -                        return Ok(());
> -                    }
> -                    _ => {
> -                        bail!("HTTP error {code} - {message}");
> -                    }
> -                },
> -                None => {
> -                    return Err(err);
> -                }
> -            };
> -        }
> -    };
> -    let tmp_manifest_blob = DataBlob::load_from_reader(&mut tmp_manifest_file)?;
> +    let tmp_manifest_blob;
> +    if let Some(data) = params
> +        .source
> +        .load_file_into(
> +            namespace,
> +            from_snapshot,
> +            MANIFEST_BLOB_NAME,
> +            &tmp_manifest_name,
> +            worker,
> +        )
> +        .await?
> +    {
> +        tmp_manifest_blob = data;

[0] so this is the only part that actually uses the parsed blob, it might make
sense to only do that parsing here..

> +    } else {
> +        return Ok(());

even further hides the wrong error handling from load_file_into..

> +    }
>  
>      if manifest_name.exists() {
>          let manifest_blob = proxmox_lang::try_block!({
> @@ -418,8 +764,11 @@ async fn pull_snapshot(
>  
>          if manifest_blob.raw_data() == tmp_manifest_blob.raw_data() {
>              if !client_log_name.exists() {
> -                try_client_log_download(worker, reader, &client_log_name).await?;
> -            }
> +                params
> +                    .source
> +                    .try_download_client_log(from_snapshot, &client_log_name, worker)
> +                    .await?;
> +            };
>              task_log!(worker, "no data changes");
>              let _ = std::fs::remove_file(&tmp_manifest_name);
>              return Ok(()); // nothing changed
> @@ -429,7 +778,7 @@ async fn pull_snapshot(
>      let manifest = BackupManifest::try_from(tmp_manifest_blob)?;
>  
>      for item in manifest.files() {
> -        let mut path = snapshot.full_path();
> +        let mut path = to_snapshot.full_path();
>          path.push(&item.filename);
>  
>          if path.exists() {
> @@ -467,18 +816,12 @@ async fn pull_snapshot(
>              }
>          }
>  
> -        let mut chunk_reader = RemoteChunkReader::new(
> -            reader.clone(),
> -            None,
> -            item.chunk_crypt_mode(),
> -            HashMap::new(),
> -        );
> -
>          pull_single_archive(
>              worker,
> -            &reader,
> -            &mut chunk_reader,
> -            snapshot,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
>              item,
>              downloaded_chunks.clone(),
>          )
> @@ -490,10 +833,12 @@ async fn pull_snapshot(
>      }
>  
>      if !client_log_name.exists() {
> -        try_client_log_download(worker, reader, &client_log_name).await?;
> -    }
> -
> -    snapshot
> +        params
> +            .source
> +            .try_download_client_log(from_snapshot, &client_log_name, worker)
> +            .await?;
> +    };

nit: stray ';'

> +    to_snapshot
>          .cleanup_unreferenced_files(&manifest)
>          .map_err(|err| format_err!("failed to cleanup unreferenced files - {err}"))?;
>  
> @@ -501,37 +846,53 @@ async fn pull_snapshot(
>  }
>  
>  /// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case.
> -///
> -/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is
> -/// pointing to the local datastore and target namespace.

please at least describe what `namespace` is referring to here..

>  async fn pull_snapshot_from(
>      worker: &WorkerTask,
> -    reader: Arc<BackupReader>,
> -    snapshot: &pbs_datastore::BackupDir,
> +    params: &mut PullParameters,
> +    namespace: &BackupNamespace,
> +    from_snapshot: &pbs_api_types::BackupDir,
> +    to_snapshot: &pbs_datastore::BackupDir,
>      downloaded_chunks: Arc<Mutex<HashSet<[u8; 32]>>>,
>  ) -> Result<(), Error> {
> -    let (_path, is_new, _snap_lock) = snapshot
> +    let (_path, is_new, _snap_lock) = to_snapshot
>          .datastore()
> -        .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?;
> +        .create_locked_backup_dir(to_snapshot.backup_ns(), to_snapshot.as_ref())?;
>  
>      if is_new {
> -        task_log!(worker, "sync snapshot {}", snapshot.dir());
> +        task_log!(worker, "sync snapshot {}", to_snapshot.dir());
>  
> -        if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await {
> -            if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir(
> -                snapshot.backup_ns(),
> -                snapshot.as_ref(),
> +        if let Err(err) = pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await
> +        {
> +            if let Err(cleanup_err) = to_snapshot.datastore().remove_backup_dir(
> +                to_snapshot.backup_ns(),
> +                to_snapshot.as_ref(),
>                  true,
>              ) {
>                  task_log!(worker, "cleanup error - {}", cleanup_err);
>              }
>              return Err(err);
>          }
> -        task_log!(worker, "sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "sync snapshot {} done", to_snapshot.dir());
>      } else {
> -        task_log!(worker, "re-sync snapshot {}", snapshot.dir());
> -        pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?;
> -        task_log!(worker, "re-sync snapshot {} done", snapshot.dir());
> +        task_log!(worker, "re-sync snapshot {}", to_snapshot.dir());
> +        pull_snapshot(
> +            worker,
> +            params,
> +            namespace,
> +            from_snapshot,
> +            to_snapshot,
> +            downloaded_chunks,
> +        )
> +        .await?;
> +        task_log!(worker, "re-sync snapshot {} done", to_snapshot.dir());
>      }
>  
>      Ok(())
> @@ -587,7 +948,6 @@ impl std::fmt::Display for SkipInfo {
>  /// - Sort by snapshot time
>  /// - Get last snapshot timestamp on local datastore
>  /// - Iterate over list of snapshots
> -/// -- Recreate client/BackupReader

should still be done.

>  /// -- pull snapshot, unless it's not finished yet or older than last local snapshot
>  /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote
>  ///
> @@ -600,101 +960,63 @@ impl std::fmt::Display for SkipInfo {
>  /// - local group owner is already checked by pull_store
>  async fn pull_group(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> +    params: &mut PullParameters,

should not be done ;)

> +    source_namespace: &BackupNamespace,

in general: inconsistent naming with regards to source/remote/local and from/to.
it would be good to be consistent at least internally, even if the config/api
parameters are a bit "weird" for backwards compat reasons for now..

>      group: &pbs_api_types::BackupGroup,
> -    remote_ns: BackupNamespace,
>      progress: &mut StoreProgress,
>  ) -> Result<(), Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/snapshots",
> -        params.source.store()
> -    );
> -
> -    let mut args = json!({
> -        "backup-type": group.ty,
> -        "backup-id": group.id,
> -    });
> -
> -    if !remote_ns.is_root() {
> -        args["ns"] = serde_json::to_value(&remote_ns)?;
> -    }
> -
> -    let target_ns = remote_ns.map_prefix(&params.remote_ns, &params.ns)?;

this involves three namespaces:
- the remote anchor
- the local anchor
- the current namespace being pulled

> -
> -    let mut result = client.get(&path, Some(args)).await?;
> -    let mut list: Vec<SnapshotListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    list.sort_unstable_by(|a, b| a.backup.time.cmp(&b.backup.time));
> -
> -    client.login().await?; // make sure auth is complete
> -
> -    let fingerprint = client.fingerprint();
> -
> -    let last_sync = params.store.last_successful_backup(&target_ns, group)?;
> -
> -    let mut remote_snapshots = std::collections::HashSet::new();
> -
> -    // start with 65536 chunks (up to 256 GiB)
> -    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
> -
> -    progress.group_snapshots = list.len() as u64;
> +    let target_ns = params.get_target_ns()?;

this is wrong, since it only involves two namespaces (the two anchors).

>  
> +    let mut source_snapshots = HashSet::new();
> +    let last_sync = params
> +        .target
> +        .store
> +        .last_successful_backup(&target_ns, group)?;
>      let mut skip_info = SkipInfo {
>          oldest: i64::MAX,
>          newest: i64::MIN,
>          count: 0,
>      };
>  
> -    for (pos, item) in list.into_iter().enumerate() {
> -        let snapshot = item.backup;
> -
> -        // in-progress backups can't be synced
> -        if item.size.is_none() {
> -            task_log!(
> -                worker,
> -                "skipping snapshot {} - in-progress backup",
> -                snapshot
> -            );
> -            continue;
> -        }
> +    let mut list: Vec<BackupDir> = params
> +        .source
> +        .list_backup_dirs(source_namespace, group, worker)
> +        .await?
> +        .into_iter()
> +        .filter(|dir| {
> +            source_snapshots.insert(dir.time);
> +            if let Some(last_sync_time) = last_sync {
> +                if last_sync_time > dir.time {
> +                    skip_info.update(dir.time);
> +                    return false;
> +                }
> +            }
> +            true
> +        })
> +        .collect();
>  
> -        remote_snapshots.insert(snapshot.time);
> +    list.sort_unstable_by(|a, b| a.time.cmp(&b.time));
>  
> -        if let Some(last_sync_time) = last_sync {
> -            if last_sync_time > snapshot.time {
> -                skip_info.update(snapshot.time);
> -                continue;
> -            }
> -        }
> +    // start with 65536 chunks (up to 256 GiB)
> +    let downloaded_chunks = Arc::new(Mutex::new(HashSet::with_capacity(1024 * 64)));
>  
> -        // get updated auth_info (new tickets)
> -        let auth_info = client.login().await?;
> -
> -        let options =
> -            HttpClientOptions::new_non_interactive(auth_info.ticket.clone(), fingerprint.clone())
> -                .rate_limit(params.limit.clone());
> -
> -        let new_client = HttpClient::new(
> -            params.source.host(),
> -            params.source.port(),
> -            params.source.auth_id(),
> -            options,
> -        )?;
> -
> -        let reader = BackupReader::start(
> -            new_client,
> -            None,
> -            params.source.store(),
> -            &remote_ns,
> -            &snapshot,
> -            true,
> -        )
> -        .await?;
> +    progress.group_snapshots = list.len() as u64;
>  
> -        let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?;
> +    for (pos, from_snapshot) in list.into_iter().enumerate() {
> +        let to_snapshot = params
> +            .target
> +            .store
> +            .backup_dir(params.target.ns.clone(), from_snapshot.clone())?;
>  
> -        let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await;
> +        let result = pull_snapshot_from(
> +            worker,
> +            params,
> +            source_namespace,
> +            &from_snapshot,
> +            &to_snapshot,
> +            downloaded_chunks.clone(),
> +        )
> +        .await;
>  
>          progress.done_snapshots = pos as u64 + 1;
>          task_log!(worker, "percentage done: {}", progress);
> @@ -703,11 +1025,14 @@ async fn pull_group(
>      }
>  
>      if params.remove_vanished {
> -        let group = params.store.backup_group(target_ns.clone(), group.clone());
> +        let group = params
> +            .target
> +            .store
> +            .backup_group(target_ns.clone(), group.clone());
>          let local_list = group.list_backups()?;
>          for info in local_list {
>              let snapshot = info.backup_dir;
> -            if remote_snapshots.contains(&snapshot.backup_time()) {
> +            if source_snapshots.contains(&snapshot.backup_time()) {
>                  continue;
>              }
>              if snapshot.is_protected() {
> @@ -720,6 +1045,7 @@ async fn pull_group(
>              }
>              task_log!(worker, "delete vanished snapshot {}", snapshot.dir());
>              params
> +                .target
>                  .store
>                  .remove_backup_dir(&target_ns, snapshot.as_ref(), false)?;
>          }
> @@ -732,64 +1058,12 @@ async fn pull_group(
>      Ok(())
>  }
>  
> -// will modify params if switching to backwards mode for lack of NS support on remote end
> -async fn query_namespaces(
> -    worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &mut PullParameters,
> -) -> Result<Vec<BackupNamespace>, Error> {
> -    let path = format!(
> -        "api2/json/admin/datastore/{}/namespace",
> -        params.source.store()
> -    );
> -    let mut data = json!({});
> -    if let Some(max_depth) = params.max_depth {
> -        data["max-depth"] = json!(max_depth);
> -    }
> -
> -    if !params.remote_ns.is_root() {
> -        data["parent"] = json!(params.remote_ns);
> -    }
> -
> -    let mut result = match client.get(&path, Some(data)).await {
> -        Ok(res) => res,
> -        Err(err) => match err.downcast_ref::<HttpError>() {
> -            Some(HttpError { code, message }) => match *code {
> -                StatusCode::NOT_FOUND => {
> -                    if params.remote_ns.is_root() && params.max_depth.is_none() {
> -                        task_log!(worker, "Could not query remote for namespaces (404) -> temporarily switching to backwards-compat mode");
> -                        task_log!(worker, "Either make backwards-compat mode explicit (max-depth == 0) or upgrade remote system.");
> -                        params.max_depth = Some(0);
> -                    } else {
> -                        bail!("Remote namespace set/recursive sync requested, but remote does not support namespaces.")
> -                    }
> -
> -                    return Ok(vec![params.remote_ns.clone()]);
> -                }
> -                _ => {
> -                    bail!("Querying namespaces failed - HTTP error {code} - {message}");
> -                }
> -            },
> -            None => {
> -                bail!("Querying namespaces failed - {err}");
> -            }
> -        },
> -    };
> -
> -    let mut list: Vec<NamespaceListItem> = serde_json::from_value(result["data"].take())?;
> -
> -    // parents first
> -    list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len()));
> -
> -    Ok(list.iter().map(|item| item.ns.clone()).collect())
> -}
> -
>  fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<bool, Error> {
>      let mut created = false;
> -    let store_ns_str = print_store_and_ns(params.store.name(), ns);
> +    let store_ns_str = print_store_and_ns(params.target.store.name(), ns);
>  
> -    if !ns.is_root() && !params.store.namespace_path(ns).exists() {
> -        check_ns_modification_privs(params.store.name(), ns, &params.owner)
> +    if !ns.is_root() && !params.target.store.namespace_path(ns).exists() {
> +        check_ns_modification_privs(params.target.store.name(), ns, &params.owner)
>              .map_err(|err| format_err!("Creating {ns} not allowed - {err}"))?;
>  
>          let name = match ns.components().last() {
> @@ -799,14 +1073,14 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>              }
>          };
>  
> -        if let Err(err) = params.store.create_namespace(&ns.parent(), name) {
> +        if let Err(err) = params.target.store.create_namespace(&ns.parent(), name) {
>              bail!("sync into {store_ns_str} failed - namespace creation failed: {err}");
>          }
>          created = true;
>      }
>  
>      check_ns_privs(
> -        params.store.name(),
> +        params.target.store.name(),
>          ns,
>          &params.owner,
>          PRIV_DATASTORE_BACKUP,
> @@ -817,10 +1091,13 @@ fn check_and_create_ns(params: &PullParameters, ns: &BackupNamespace) -> Result<
>  }
>  
>  fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result<bool, Error> {
> -    check_ns_modification_privs(params.store.name(), local_ns, &params.owner)
> +    check_ns_modification_privs(params.target.store.name(), local_ns, &params.owner)
>          .map_err(|err| format_err!("Removing {local_ns} not allowed - {err}"))?;
>  
> -    params.store.remove_namespace_recursive(local_ns, true)
> +    params
> +        .target
> +        .store
> +        .remove_namespace_recursive(local_ns, true)
>  }
>  
>  fn check_and_remove_vanished_ns(
> @@ -834,14 +1111,15 @@ fn check_and_remove_vanished_ns(
>      // clamp like remote does so that we don't list more than we can ever have synced.
>      let max_depth = params
>          .max_depth
> -        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.remote_ns.depth());
> +        .unwrap_or_else(|| MAX_NAMESPACE_DEPTH - params.source.get_ns().depth());
>  
>      let mut local_ns_list: Vec<BackupNamespace> = params
> +        .target
>          .store
> -        .recursive_iter_backup_ns_ok(params.ns.clone(), Some(max_depth))?
> +        .recursive_iter_backup_ns_ok(params.target.ns.clone(), Some(max_depth))?
>          .filter(|ns| {
>              let user_privs =
> -                user_info.lookup_privs(&params.owner, &ns.acl_path(params.store.name()));
> +                user_info.lookup_privs(&params.owner, &ns.acl_path(params.target.store.name()));
>              user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0
>          })
>          .collect();
> @@ -850,7 +1128,7 @@ fn check_and_remove_vanished_ns(
>      local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len()));
>  
>      for local_ns in local_ns_list {
> -        if local_ns == params.ns {
> +        if local_ns == params.target.ns {
>              continue;
>          }
>  
> @@ -897,29 +1175,28 @@ fn check_and_remove_vanished_ns(
>  /// - access to sub-NS checked here
>  pub(crate) async fn pull_store(
>      worker: &WorkerTask,
> -    client: &HttpClient,
>      mut params: PullParameters,
>  ) -> Result<(), Error> {
>      // explicit create shared lock to prevent GC on newly created chunks
> -    let _shared_store_lock = params.store.try_shared_chunk_store_lock()?;
> +    let _shared_store_lock = params.target.store.try_shared_chunk_store_lock()?;
>      let mut errors = false;
>  
>      let old_max_depth = params.max_depth;
> -    let namespaces = if params.remote_ns.is_root() && params.max_depth == Some(0) {
> -        vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces!
> -    } else {
> -        query_namespaces(worker, client, &mut params).await?
> -    };
> +    let mut namespaces = params
> +        .source
> +        .list_namespaces(&mut params.max_depth, worker)
> +        .await?;
>      errors |= old_max_depth != params.max_depth; // fail job if we switched to backwards-compat mode
> +    namespaces.sort_unstable_by(|a, b| a.name_len().cmp(&b.name_len()));
>  
>      let (mut groups, mut snapshots) = (0, 0);
>      let mut synced_ns = HashSet::with_capacity(namespaces.len());
>  
>      for namespace in namespaces {
> -        let source_store_ns_str = print_store_and_ns(params.source.store(), &namespace);
> +        let source_store_ns_str = params.source.print_store_and_ns();
>  
> -        let target_ns = namespace.map_prefix(&params.remote_ns, &params.ns)?;
> -        let target_store_ns_str = print_store_and_ns(params.store.name(), &target_ns);
> +        let target_ns = namespace.map_prefix(&params.source.get_ns(), &params.target.ns)?;
> +        let target_store_ns_str = print_store_and_ns(params.target.store.name(), &target_ns);
>  
>          task_log!(worker, "----");
>          task_log!(
> @@ -947,7 +1224,7 @@ pub(crate) async fn pull_store(
>              }
>          }
>  
> -        match pull_ns(worker, client, &params, namespace.clone(), target_ns).await {
> +        match pull_ns(worker, &namespace, &mut params).await {
>              Ok((ns_progress, ns_errors)) => {
>                  errors |= ns_errors;
>  
> @@ -968,7 +1245,7 @@ pub(crate) async fn pull_store(
>                  task_log!(
>                      worker,
>                      "Encountered errors while syncing namespace {} - {}",
> -                    namespace,
> +                    &namespace,
>                      err,
>                  );
>              }
> @@ -1000,33 +1277,17 @@ pub(crate) async fn pull_store(
>  /// - owner check for vanished groups done here
>  pub(crate) async fn pull_ns(
>      worker: &WorkerTask,
> -    client: &HttpClient,
> -    params: &PullParameters,
> -    source_ns: BackupNamespace,
> -    target_ns: BackupNamespace,
> +    namespace: &BackupNamespace,
> +    params: &mut PullParameters,
>  ) -> Result<(StoreProgress, bool), Error> {
> -    let path = format!("api2/json/admin/datastore/{}/groups", params.source.store());
> -
> -    let args = if !source_ns.is_root() {
> -        Some(json!({
> -            "ns": source_ns,
> -        }))
> -    } else {
> -        None
> -    };
> -
> -    let mut result = client
> -        .get(&path, args)
> -        .await
> -        .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?;
> -
> -    let mut list: Vec<GroupListItem> = serde_json::from_value(result["data"].take())?;
> +    let mut list: Vec<pbs_api_types::BackupGroup> =
> +        params.source.list_groups(namespace, &params.owner).await?;
>  
>      let total_count = list.len();
>      list.sort_unstable_by(|a, b| {
> -        let type_order = a.backup.ty.cmp(&b.backup.ty);
> +        let type_order = a.ty.cmp(&b.ty);
>          if type_order == std::cmp::Ordering::Equal {
> -            a.backup.id.cmp(&b.backup.id)
> +            a.id.cmp(&b.id)
>          } else {
>              type_order
>          }
> @@ -1036,9 +1297,6 @@ pub(crate) async fn pull_ns(
>          filters.iter().any(|filter| group.matches(filter))
>      };
>  
> -    // Get groups with target NS set
> -    let list: Vec<pbs_api_types::BackupGroup> = list.into_iter().map(|item| item.backup).collect();
> -
>      let list = if let Some(ref group_filter) = &params.group_filter {
>          let unfiltered_count = list.len();
>          let list: Vec<pbs_api_types::BackupGroup> = list
> @@ -1066,6 +1324,7 @@ pub(crate) async fn pull_ns(
>  
>      let mut progress = StoreProgress::new(list.len() as u64);
>  
> +    let target_ns = params.get_target_ns()?;

this is the wrong namespace.. we need to map the source namespace to the target
namespace anchor (e.g., if we are recursively pulling from /a/b to /a/z and the
namespace currently being pulled ist /a/b/c, then target namespace should be
/a/z/c, not /a/z).

>      for (done, group) in list.into_iter().enumerate() {
>          progress.done_groups = done as u64;
>          progress.done_snapshots = 0;
> @@ -1073,6 +1332,7 @@ pub(crate) async fn pull_ns(
>  
>          let (owner, _lock_guard) =
>              match params
> +                .target
>                  .store
>                  .create_locked_backup_group(&target_ns, &group, &params.owner)
>              {
> @@ -1085,6 +1345,7 @@ pub(crate) async fn pull_ns(
>                          err
>                      );
>                      errors = true; // do not stop here, instead continue
> +                    task_log!(worker, "create_locked_backup_group failed");

any reason this is here? it's already logged two lines above ;)

>                      continue;
>                  }
>              };
> @@ -1100,15 +1361,7 @@ pub(crate) async fn pull_ns(
>                  owner
>              );
>              errors = true; // do not stop here, instead continue
> -        } else if let Err(err) = pull_group(
> -            worker,
> -            client,
> -            params,
> -            &group,
> -            source_ns.clone(),
> -            &mut progress,
> -        )
> -        .await
> +        } else if let Err(err) = pull_group(worker, params, namespace, &group, &mut progress).await
>          {
>              task_log!(worker, "sync group {} failed - {}", &group, err,);
>              errors = true; // do not stop here, instead continue
> @@ -1117,13 +1370,13 @@ pub(crate) async fn pull_ns(
>  
>      if params.remove_vanished {
>          let result: Result<(), Error> = proxmox_lang::try_block!({
> -            for local_group in params.store.iter_backup_groups(target_ns.clone())? {
> +            for local_group in params.target.store.iter_backup_groups(target_ns.clone())? {
>                  let local_group = local_group?;
>                  let local_group = local_group.group();
>                  if new_groups.contains(local_group) {
>                      continue;
>                  }
> -                let owner = params.store.get_owner(&target_ns, local_group)?;
> +                let owner = params.target.store.get_owner(&target_ns, local_group)?;
>                  if check_backup_owner(&owner, &params.owner).is_err() {
>                      continue;
>                  }
> @@ -1133,7 +1386,11 @@ pub(crate) async fn pull_ns(
>                      }
>                  }
>                  task_log!(worker, "delete vanished group '{local_group}'",);
> -                match params.store.remove_backup_group(&target_ns, local_group) {
> +                match params
> +                    .target
> +                    .store
> +                    .remove_backup_group(&target_ns, local_group)
> +                {
>                      Ok(true) => {}
>                      Ok(false) => {
>                          task_log!(
> -- 
> 2.30.2
> 
> 
> 
> _______________________________________________
> pbs-devel mailing list
> pbs-devel@lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pbs-devel
> 
> 
> 




^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2023-02-28 11:36 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-02-23 12:55 [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Hannes Laimer
2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 1/5] api2: make Remote for SyncJob optional Hannes Laimer
2023-02-28  9:41   ` Wolfgang Bumiller
2023-02-28 11:35   ` Fabian Grünbichler
2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 2/5] ui: add support for optional Remote in SyncJob Hannes Laimer
2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 3/5] manager: add completion for opt. " Hannes Laimer
2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 4/5] pbs-client: accept a ref to a HttpClient in BackupReader::starting Hannes Laimer
2023-02-28 11:35   ` Fabian Grünbichler
2023-02-23 12:55 ` [pbs-devel] [PATCH proxmox-backup v2 5/5] pull: add support for local pulling Hannes Laimer
2023-02-28 11:25   ` Wolfgang Bumiller
2023-02-28 11:36   ` Fabian Grünbichler
2023-02-28 11:35 ` [pbs-devel] [PATCH proxmox-backup v2 0/5] local sync-jobs Fabian Grünbichler

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox
Service provided by Proxmox Server Solutions GmbH | Privacy | Legal